This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 935aece Reduce handle connect event time in the access log module
(#174)
935aece is described below
commit 935aece3a460274b075b8211051d61e18bf0fc7d
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 30 19:05:06 2024 +0800
Reduce handle connect event time in the access log module (#174)
---
CHANGES.md | 1 +
bpf/accesslog/common/connection.h | 3 +-
bpf/accesslog/common/data_args.h | 6 +-
bpf/accesslog/syscalls/connect_conntrack.c | 5 +-
bpf/accesslog/syscalls/connect_conntrack.h | 2 +-
go.mod | 4 +-
go.sum | 11 +-
pkg/accesslog/collector/connection.go | 79 ++++----
pkg/accesslog/collector/protocols/connection.go | 1 -
pkg/accesslog/collector/protocols/queue.go | 33 +---
pkg/accesslog/common/connection.go | 214 +++++----------------
pkg/accesslog/common/filter.go | 5 +
pkg/accesslog/events/close.go | 4 +-
pkg/accesslog/events/connect.go | 4 +-
pkg/accesslog/events/data.go | 8 +-
pkg/accesslog/events/detail.go | 4 +-
pkg/accesslog/events/events_test.go | 16 +-
pkg/accesslog/events/ztunnel.go | 6 +-
pkg/process/api.go | 2 +
pkg/process/finders/kubernetes/finder.go | 58 +++++-
pkg/process/finders/manager.go | 9 +
pkg/process/module.go | 9 +
pkg/profiling/task/network/analyze/events/data.go | 4 +
.../task/network/analyze/layer7/events.go | 4 +-
pkg/tools/btf/linker.go | 63 ++++--
pkg/tools/btf/queue.go | 74 -------
pkg/tools/btf/{reader => }/reader.go | 75 ++++++--
pkg/tools/buffer/buffer.go | 85 ++++----
pkg/tools/ip.go | 24 ++-
pkg/tools/ip/conntrack.go | 17 +-
pkg/tools/ip/tcpresolver.go | 2 +
31 files changed, 399 insertions(+), 433 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3d54791..16fc6fd 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -21,6 +21,7 @@ Release Notes.
* Add warning log when the event queue almost full in the access log module.
* Reduce unessential `conntrack` query when detect new connection.
* Reduce CPU and memory usage in the access log module.
+* Reduce handle connection event time in the access log module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/common/connection.h
b/bpf/accesslog/common/connection.h
index 880826d..834737a 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -27,7 +27,7 @@
// syscall:connect
struct connect_args_t {
__u32 fd;
- __u32 fix;
+ __u32 has_remote;
struct sockaddr* addr;
struct sock *sock;
__u64 start_nacs;
@@ -305,6 +305,7 @@ static __inline void submit_connection_when_not_exists(void
*ctx, __u64 id, stru
static __inline void notify_close_connection(void* ctx, __u64 conid, struct
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
bpf_map_delete_elem(&socket_data_last_id_map, &conid);
+ bpf_map_delete_elem(&socket_data_id_generate_map, &conid);
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue,
sizeof(*close_event));
if (close_event == NULL) {
diff --git a/bpf/accesslog/common/data_args.h b/bpf/accesslog/common/data_args.h
index 2d76062..33eeea6 100644
--- a/bpf/accesslog/common/data_args.h
+++ b/bpf/accesslog/common/data_args.h
@@ -66,14 +66,14 @@ struct sock_data_args_t {
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
- __uint(max_entries, 10000);
+ __uint(max_entries, 65535);
__type(key, __u64);
__type(value, struct sock_data_args_t);
} socket_data_args SEC(".maps");
struct {
- __uint(type, BPF_MAP_TYPE_HASH);
- __uint(max_entries, 10000);
+ __uint(type, BPF_MAP_TYPE_LRU_HASH);
+ __uint(max_entries, 65535);
__type(key, __u64);
__type(value, __u64);
} socket_data_id_generate_map SEC(".maps");
diff --git a/bpf/accesslog/syscalls/connect_conntrack.c
b/bpf/accesslog/syscalls/connect_conntrack.c
index 4efac48..8f1f5cb 100644
--- a/bpf/accesslog/syscalls/connect_conntrack.c
+++ b/bpf/accesslog/syscalls/connect_conntrack.c
@@ -93,7 +93,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx,
struct nf_conn *ct
}
// already contains the remote address
- if (&(connect_args->remote) != NULL) {
+ if (connect_args->has_remote && &(connect_args->remote) != NULL) {
return 0;
}
@@ -126,6 +126,7 @@ static __always_inline int nf_conn_aware(struct pt_regs*
ctx, struct nf_conn *ct
remote.ipl = reply_conn.saddr_l;
remote.port = reply_conn.sport;
connect_args->remote = remote;
+ connect_args->has_remote = 1;
bpf_map_update_elem(&conecting_args, &id, connect_args, 0);
return 0;
@@ -144,4 +145,4 @@ int nf_confirm(struct pt_regs* ctx) {
SEC("kprobe/ctnetlink_fill_info")
int nf_ctnetlink_fill_info(struct pt_regs* ctx) {
return nf_conn_aware(ctx, (struct nf_conn*)PT_REGS_PARM5(ctx));
-}
+}
\ No newline at end of file
diff --git a/bpf/accesslog/syscalls/connect_conntrack.h
b/bpf/accesslog/syscalls/connect_conntrack.h
index 0e67b9c..06e5072 100644
--- a/bpf/accesslog/syscalls/connect_conntrack.h
+++ b/bpf/accesslog/syscalls/connect_conntrack.h
@@ -96,4 +96,4 @@ struct nf_conn {
struct nf_conntrack_tuple_hash tuplehash[IP_CT_DIR_MAX];
long unsigned int status;
__u32 mark;
-} __attribute__((preserve_access_index));
+} __attribute__((preserve_access_index));
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 7139cf3..bb05af7 100644
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.3.0
github.com/spf13/viper v1.10.1
- github.com/stretchr/testify v1.8.1
+ github.com/stretchr/testify v1.8.4
github.com/zekroTJA/timedmap v1.4.0
golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
golang.org/x/net v0.32.0
@@ -46,7 +46,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect
- github.com/mdlayher/socket v0.4.1 // indirect
+ github.com/mdlayher/socket v0.5.1 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
diff --git a/go.sum b/go.sum
index 1cc3adf..e15c3cc 100644
--- a/go.sum
+++ b/go.sum
@@ -369,8 +369,8 @@ github.com/mdlayher/netlink v1.7.2/go.mod
h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU
github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod
h1:GAFlyu4/XV68LkQKYzKhIo/WW7j3Zi0YRAz/BOoanUc=
github.com/mdlayher/socket v0.0.0-20211007213009-516dcbdf0267/go.mod
h1:nFZ1EtZYK8Gi/k6QNu7z7CgO20i/4ExeQswwWuPmG/g=
github.com/mdlayher/socket v0.1.0/go.mod
h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs=
-github.com/mdlayher/socket v0.4.1
h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
-github.com/mdlayher/socket v0.4.1/go.mod
h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
+github.com/mdlayher/socket v0.5.1
h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos=
+github.com/mdlayher/socket v0.5.1/go.mod
h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
github.com/miekg/dns v1.0.14/go.mod
h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.26/go.mod
h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod
h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
@@ -467,7 +467,6 @@ github.com/spf13/viper v1.10.1/go.mod
h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8q
github.com/stoewer/go-strcase v1.2.0/go.mod
h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.4.0/go.mod
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -476,10 +475,8 @@ github.com/stretchr/testify v1.4.0/go.mod
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0/go.mod
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.1
h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
-github.com/stretchr/testify v1.8.1/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0
h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod
h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tklauser/go-sysconf v0.3.9
h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index 4fc6823..b7930e2 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -33,6 +33,8 @@ import (
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process"
+ "github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/enums"
@@ -56,7 +58,7 @@ func NewConnectionCollector() *ConnectCollector {
return &ConnectCollector{}
}
-func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext) error {
+func (c *ConnectCollector) Start(m *module.Manager, ctx
*common.AccessLogContext) error {
perCPUBufferSize, err :=
units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
if err != nil {
return err
@@ -73,13 +75,9 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
return fmt.Errorf("the queue size be small than 1")
}
- track, err := ip.NewConnTrack()
- if err != nil {
- connectionLogger.Warnf("cannot create the connection tracker,
%v", err)
- }
c.eventQueue = btf.NewEventQueue("connection resolver",
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int)
btf.PartitionContext {
- return newConnectionPartitionContext(ctx, track)
+ return NewConnectionPartitionContext(ctx,
m.FindModule(process.ModuleName).(process.K8sOperator))
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
@@ -130,18 +128,18 @@ func (c *ConnectCollector) Stop() {
type ConnectionPartitionContext struct {
context *common.AccessLogContext
- connTracker *ip.ConnTrack
+ k8sOperator process.K8sOperator
}
-func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker
*ip.ConnTrack) *ConnectionPartitionContext {
+func NewConnectionPartitionContext(ctx *common.AccessLogContext,
+ k8sOperator process.K8sOperator) *ConnectionPartitionContext {
return &ConnectionPartitionContext{
context: ctx,
- connTracker: connTracker,
+ k8sOperator: k8sOperator,
}
}
func (c *ConnectionPartitionContext) Start(ctx context.Context) {
-
}
func (c *ConnectionPartitionContext) Consume(data interface{}) {
@@ -151,7 +149,7 @@ func (c *ConnectionPartitionContext) Consume(data
interface{}) {
"pid: %d, fd: %d, role: %s: func: %s, family: %d,
success: %d, conntrack exist: %t",
event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
- socketPair := c.buildSocketFromConnectEvent(event)
+ socketPair := c.BuildSocketFromConnectEvent(event)
if socketPair == nil {
connectionLogger.Debugf("cannot found the socket paire
from connect event, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
@@ -159,7 +157,6 @@ func (c *ConnectionPartitionContext) Consume(data
interface{}) {
}
connectionLogger.Debugf("build socket pair success, connection
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
event.ConID, event.RandomID, socketPair.Role,
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
- c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
forwarder.SendConnectEvent(c.context, event, socketPair)
case *events.SocketCloseEvent:
connectionLogger.Debugf("receive close event, connection ID:
%d, randomID: %d, pid: %d, fd: %d",
@@ -169,7 +166,7 @@ func (c *ConnectionPartitionContext) Consume(data
interface{}) {
}
}
-func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event
*events.SocketConnectEvent, result *ip.SocketPair) {
+func (c *ConnectionPartitionContext) FixSocketFamilyIfNeed(event
*events.SocketConnectEvent, result *ip.SocketPair) {
if result == nil {
return
}
@@ -189,39 +186,39 @@ func (c *ConnectionPartitionContext)
fixSocketFamilyIfNeed(event *events.SocketC
}
}
-func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event
*events.SocketConnectEvent) *ip.SocketPair {
+func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event
*events.SocketConnectEvent) *ip.SocketPair {
if event.SocketFamily != unix.AF_INET && event.SocketFamily !=
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
// if not ipv4, ipv6 or unknown, ignore
return nil
}
- socketPair := c.buildSocketPair(event)
- if socketPair != nil && socketPair.IsValid() {
+ pair := c.BuildSocketPair(event)
+ if pair != nil && pair.IsValid() {
connectionLogger.Debugf("found the connection from the connect
event is valid, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
- return socketPair
+ return pair
}
// if only the local port not success, maybe the upstream port is not
open, so it could be continued
- if c.isOnlyLocalPortEmpty(socketPair) {
+ if c.IsOnlyLocalPortEmpty(pair) {
event.ConnectSuccess = 0
connectionLogger.Debugf("the connection from the connect event
is only the local port is empty, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
- return socketPair
+ return pair
}
pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
- connectionLogger.Debugf("cannot found the socket, pid: %d,
socket FD: %d", event.PID, event.SocketFD)
+ connectionLogger.Debugf("cannot found the socket, pid: %d,
socket FD: %d, error: %v", event.PID, event.SocketFD, err)
return nil
}
connectionLogger.Debugf("found the connection from the socket,
connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
pair.Role = enums.ConnectionRole(event.Role)
- c.fixSocketFamilyIfNeed(event, pair)
- c.tryToUpdateSocketFromConntrack(event, pair)
+ c.FixSocketFamilyIfNeed(event, pair)
+ c.CheckNeedConntrack(event, pair)
return pair
}
-func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair
*ip.SocketPair) bool {
+func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair
*ip.SocketPair) bool {
if socketPair == nil {
return false
}
@@ -233,7 +230,7 @@ func (c *ConnectionPartitionContext)
isOnlyLocalPortEmpty(socketPair *ip.SocketP
return socketPair.IsValid()
}
-func (c *ConnectionPartitionContext) buildSocketPair(event
*events.SocketConnectEvent) *ip.SocketPair {
+func (c *ConnectionPartitionContext) BuildSocketPair(event
*events.SocketConnectEvent) *ip.SocketPair {
var result *ip.SocketPair
haveConnTrack := false
if event.SocketFamily == unix.AF_INET {
@@ -288,22 +285,28 @@ func (c *ConnectionPartitionContext)
buildSocketPair(event *events.SocketConnect
return result
}
- c.fixSocketFamilyIfNeed(event, result)
- c.tryToUpdateSocketFromConntrack(event, result)
+ c.FixSocketFamilyIfNeed(event, result)
+ c.CheckNeedConntrack(event, result)
return result
}
-func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event
*events.SocketConnectEvent, socket *ip.SocketPair) {
- if socket != nil && socket.IsValid() && c.connTracker != nil &&
!tools.IsLocalHostAddress(socket.DestIP) &&
- event.FuncName != enums.SocketFunctionNameAccept { // accept
event don't need to update the remote address
- // if no contract and socket data is valid, then trying to get
the remote address from the socket
- // to encase the remote address is not the real remote address
- originalIP := socket.DestIP
- originalPort := socket.DestPort
- if c.connTracker.UpdateRealPeerAddress(socket) {
- connectionLogger.Debugf("update the socket address from
conntrack success, "+
- "connection ID: %d, randomID: %d, original
remote: %s:%d, new remote: %s:%d",
- event.ConID, event.RandomID, originalIP,
originalPort, socket.DestIP, socket.DestPort)
- }
+func (c *ConnectionPartitionContext) CheckNeedConntrack(event
*events.SocketConnectEvent, socket *ip.SocketPair) {
+ if socket == nil || !socket.IsValid() ||
tools.IsLocalHostAddress(socket.DestIP) ||
+ event.FuncName == enums.SocketFunctionNameAccept || // accept
event don't need to update the remote address
+ !c.context.ConnectionMgr.ProcessIsDetectBy(event.PID,
api.Kubernetes) { // only the k8s process need to update the remote address
from conntrack
+ return
+ }
+
+ isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
+ if err != nil {
+ connectionLogger.Warnf("cannot found the pod IP, connection ID:
%d, randomID: %d, error: %v",
+ event.ConID, event.RandomID, err)
+ }
+ if isPodIP {
+ connectionLogger.Debugf("detect the remote IP is pod IP,
connection ID: %d, randomID: %d, remote: %s",
+ event.ConID, event.RandomID, socket.DestIP)
+ return
}
+ // update to the socket need to update the remote address from conntrack
+ socket.NeedConnTrack = true
}
diff --git a/pkg/accesslog/collector/protocols/connection.go
b/pkg/accesslog/collector/protocols/connection.go
index b6a9e24..66d0ce9 100644
--- a/pkg/accesslog/collector/protocols/connection.go
+++ b/pkg/accesslog/collector/protocols/connection.go
@@ -34,7 +34,6 @@ type PartitionConnection struct {
protocolAnalyzer map[enums.ConnectionProtocol]Protocol
protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
closed bool
- closeCallback common.ConnectionProcessFinishCallback
skipAllDataAnalyze bool
lastCheckCloseTime time.Time
}
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index a4577f7..b336f9d 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -102,7 +102,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
- return &events.SocketDataUploadEvent{}
+ return &events.SocketDataUploadEvent{Buffer:
*buffer.BorrowNewBuffer()}
}, func(data interface{}) int {
return
int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
@@ -166,20 +166,6 @@ func NewPartitionContext(ctx *common.AccessLogContext, num
int, protocols []Prot
return pc
}
-func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent,
closeCallback common.ConnectionProcessFinishCallback) {
- conn, exist :=
p.connections.Get(p.buildConnectionKey(event.GetConnectionID(),
event.GetRandomID()))
- if !exist {
- log.Debugf("connection is not exist in the partion context,
connection ID: %d, random ID: %d, partition number: %d",
- event.GetConnectionID(), event.GetRandomID(),
p.partitionNum)
- closeCallback()
- return
- }
- connection := conn.(*PartitionConnection)
- connection.closeCallback = closeCallback
- log.Debugf("receive the connection close event and mark is closable,
connection ID: %d, random ID: %d, partition number: %d",
- event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
-}
-
func (p *PartitionContext) Start(ctx context.Context) {
// process events with interval
flushDuration, _ := time.ParseDuration(p.context.Config.Flush.Period)
@@ -189,7 +175,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
select {
case <-timeTicker.C:
// process event with interval
- p.processEvents()
+ p.ProcessEvents()
case <-ctx.Done():
timeTicker.Stop()
return
@@ -203,7 +189,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
for {
select {
case <-expireTicker.C:
- p.processExpireEvents()
+ p.ProcessExpireEvents()
case <-ctx.Done():
expireTicker.Stop()
return
@@ -225,19 +211,19 @@ func (p *PartitionContext) Consume(data interface{}) {
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
- connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol(), event.DataID())
+ connection := p.GetConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol(), event.DataID())
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, prev data id: %d, "+
"data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid,
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
- connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
+ connection := p.GetConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
}
-func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
+func (p *PartitionContext) GetConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64)
*PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
@@ -259,7 +245,7 @@ func (p *PartitionContext) buildConnectionKey(conID, ranID
uint64) string {
return string(buf)
}
-func (p *PartitionContext) processEvents() {
+func (p *PartitionContext) ProcessEvents() {
// it could be triggered by interval or reach counter
// if any trigger bean locked, the other one just ignore process
if !p.analyzeLocker.TryLock() {
@@ -284,9 +270,6 @@ func (p *PartitionContext) processEvents() {
p.checkTheConnectionIsAlreadyClose(info)
}
if info.closed {
- if info.closeCallback != nil {
- info.closeCallback()
- }
closedConnections = append(closedConnections, conKey)
log.Debugf("detect the connection is already closed,
then notify to the callback, connection ID: %d, random ID: %d, partition
number: %d",
info.connectionID, info.randomID,
p.partitionNum)
@@ -320,7 +303,7 @@ func (p *PartitionContext)
checkTheConnectionIsAlreadyClose(con *PartitionConnec
}
}
-func (p *PartitionContext) processExpireEvents() {
+func (p *PartitionContext) ProcessExpireEvents() {
// the expiry must be mutual exclusion with events processor
p.analyzeLocker.Lock()
defer p.analyzeLocker.Unlock()
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index afac079..1c0a86e 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -42,16 +42,11 @@ import (
cmap "github.com/orcaman/concurrent-map"
- "k8s.io/apimachinery/pkg/util/cache"
-
v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
const (
- // only using to match the remote IP address
- localAddressPairCacheTime = time.Second * 15
-
// clean the active connection in BPF interval
cleanActiveConnectionInterval = time.Second * 20
@@ -62,19 +57,6 @@ const (
connectionCheckExistTime = time.Second * 30
)
-type addressProcessType int
-
-const (
- addressProcessTypeUnknown addressProcessType = iota
- addressProcessTypeLocal
- addressProcessTypeKubernetes
-)
-
-const (
- strLocal = "local"
- strRemote = "remote"
-)
-
type ConnectEventWithSocket struct {
*events.SocketConnectEvent
SocketPair *ip.SocketPair
@@ -82,13 +64,11 @@ type ConnectEventWithSocket struct {
type CloseEventWithNotify struct {
*events.SocketCloseEvent
- allProcessorFinished bool
}
type ConnectionProcessFinishCallback func()
type ConnectionProcessor interface {
- OnConnectionClose(event *events.SocketCloseEvent, callback
ConnectionProcessFinishCallback)
}
type FlusherListener interface {
@@ -105,10 +85,6 @@ type ConnectionManager struct {
moduleMgr *module.Manager
processOP process.Operator
connections cmap.ConcurrentMap
- // addressWithPid cache all local ip+port and pid mapping for match the
process on the same host
- // such as service mesh(process with envoy)
- addressWithPid *cache.Expiring
- localPortWithPid *cache.Expiring // in some case, we can only get the
127.0.0.1 from server side, so we only cache the port for this
// localIPWithPid cache all local monitoring process bind IP address
// for checking the remote address is local or not
localIPWithPid map[string]int32
@@ -124,10 +100,9 @@ type ConnectionManager struct {
processors []ConnectionProcessor
processListeners []ProcessListener
- // connection already close but the connection (protocols)log not build
finished
- allUnfinishedConnections map[string]*bool
-
flushListeners []FlusherListener
+
+ connectTracker *ip.ConnTrack
}
func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -142,10 +117,6 @@ func (c *ConnectionManager)
RegisterNewFlushListener(listener FlusherListener) {
c.flushListeners = append(c.flushListeners, listener)
}
-type addressInfo struct {
- pid uint32
-}
-
type ConnectionInfo struct {
ConnectionID uint64
RandomID uint64
@@ -158,19 +129,21 @@ type ConnectionInfo struct {
}
func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
+ track, err := ip.NewConnTrack()
+ if err != nil {
+ log.Warnf("cannot create the connection tracker, %v", err)
+ }
mgr := &ConnectionManager{
- moduleMgr: moduleMgr,
- processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
- connections: cmap.New(),
- addressWithPid: cache.NewExpiring(),
- localPortWithPid: cache.NewExpiring(),
- localIPWithPid: make(map[string]int32),
- monitoringProcesses:
make(map[int32][]api.ProcessInterface),
- processMonitorMap: bpfLoader.ProcessMonitorControl,
- activeConnectionMap: bpfLoader.ActiveConnectionMap,
- allUnfinishedConnections: make(map[string]*bool),
- monitorFilter: filter,
- flushListeners: make([]FlusherListener, 0),
+ moduleMgr: moduleMgr,
+ processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
+ connections: cmap.New(),
+ localIPWithPid: make(map[string]int32),
+ monitoringProcesses: make(map[int32][]api.ProcessInterface),
+ processMonitorMap: bpfLoader.ProcessMonitorControl,
+ activeConnectionMap: bpfLoader.ActiveConnectionMap,
+ monitorFilter: filter,
+ flushListeners: make([]FlusherListener, 0),
+ connectTracker: track,
}
return mgr
}
@@ -276,39 +249,26 @@ func (c *ConnectionManager) Find(event events.Event)
*ConnectionInfo {
}
func (c *ConnectionManager) buildRemoteAddress(e *events.SocketConnectEvent,
socket *ip.SocketPair) *v3.ConnectionAddress {
- tp := c.isLocalTarget(socket)
- if tp == addressProcessTypeUnknown {
- log.Debugf("building the remote address to unknown, connection:
%d-%d, role: %s, local: %s:%d, remote: %s:%d",
- e.GetConnectionID(), e.GetRandomID(), socket.Role,
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
- return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
+ // if the remote address is local, then no needs to build the
address(access log no need to send by communicate with self)
+ if tools.IsLocalHostAddress(socket.DestIP) {
+ return nil
}
- var addrInfo *addressInfo
- var fromType string
- switch socket.Role {
- case enums.ConnectionRoleClient:
- addrInfo = c.getAddressPid(socket.SrcIP, socket.SrcPort, false)
- fromType = strLocal
- case enums.ConnectionRoleServer:
- addrInfo = c.getAddressPid(socket.DestIP, socket.DestPort, true)
- fromType = strRemote
- }
-
- if addrInfo != nil {
- log.Debugf("building the remote address from %s process, pid:
%d, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d",
- fromType, addrInfo.pid, e.GetConnectionID(),
e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP,
socket.DestPort)
- return c.buildLocalAddress(addrInfo.pid, socket.DestPort,
socket)
- } else if tp == addressProcessTypeKubernetes {
- if p := c.localIPWithPid[socket.DestIP]; p != 0 {
- log.Debugf("building the remote address from kubernetes
process, connection: %d-%d, role: %s, pid: %d, local: %s:%d, remote: %s:%d",
- e.GetConnectionID(), e.GetRandomID(),
socket.Role, p, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
- return c.buildLocalAddress(uint32(p), socket.DestPort,
socket)
+ // if the remote connection is need to use conntrack, then update the
real peer address
+ if socket.NeedConnTrack {
+ if err := c.connectTracker.UpdateRealPeerAddress(socket); err
!= nil {
+ log.Debugf("cannot update the real peer address, %v",
err)
}
}
- log.Debugf("cannot found the peer pid for the connection: %d-%d, remote
type: %v, role: %s, local: %s:%d, remote: %s:%d",
- e.GetConnectionID(), e.GetRandomID(), tp, socket.Role,
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
- return nil
+ // found local address with pid
+ if pid, exist := c.localIPWithPid[socket.DestIP]; exist && pid != 0 {
+ return c.buildLocalAddress(uint32(pid), socket.DestPort, socket)
+ }
+
+ log.Debugf("building the remote address to unknown, connection: %d-%d,
role: %s, local: %s:%d, remote: %s:%d",
+ e.GetConnectionID(), e.GetRandomID(), socket.Role,
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
+ return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
}
func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo,
event events.Event) {
@@ -317,12 +277,7 @@ func (c *ConnectionManager)
connectionPostHandle(connection *ConnectionInfo, eve
}
switch e := event.(type) {
case *CloseEventWithNotify:
- if e.allProcessorFinished {
- connection.MarkDeletable = true
- } else {
- // if not all processor finished, then add into the map
- c.allUnfinishedConnections[fmt.Sprintf("%d_%d",
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
- }
+ connection.MarkDeletable = true
case events.SocketDetail:
tlsMode := connection.RPCConnection.TlsMode
protocol := connection.RPCConnection.Protocol
@@ -368,6 +323,17 @@ func (c *ConnectionManager) ProcessIsMonitor(pid uint32)
bool {
return len(c.monitoringProcesses[int32(pid)]) > 0
}
+func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, detectType
api.ProcessDetectType) bool {
+ c.monitoringProcessLock.RLock()
+ defer c.monitoringProcessLock.RUnlock()
+ for _, p := range c.monitoringProcesses[int32(pid)] {
+ if p.DetectType() == detectType {
+ return true
+ }
+ }
+ return false
+}
+
func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent,
socket *ip.SocketPair,
local, remote *v3.ConnectionAddress) *ConnectionInfo {
var role v32.DetectPoint
@@ -436,85 +402,9 @@ func (c *ConnectionManager) buildAddressFromRemote(ipHost
string, port uint16) *
}
func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent)
*CloseEventWithNotify {
- result := &CloseEventWithNotify{
- SocketCloseEvent: event,
- allProcessorFinished: false,
- }
- processCount := len(c.processors)
- for _, l := range c.processors {
- l.OnConnectionClose(event, func() {
- processCount--
- if processCount > 0 {
- return
- }
- result.allProcessorFinished = true
- })
+ return &CloseEventWithNotify{
+ SocketCloseEvent: event,
}
- return result
-}
-
-func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16,
localPid bool, pid uint32) {
- localAddrInfo := &addressInfo{
- pid: pid,
- }
- c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port,
localPid), localAddrInfo, localAddressPairCacheTime)
- localStr := strRemote
- if localPid {
- localStr = strLocal
- }
- log.Debugf("saving the %s address with pid cache, address: %s:%d, pid:
%d", localStr, hostAddress, port, pid)
-}
-
-func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16,
localPid bool) *addressInfo {
- addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t",
hostAddress, port, localPid))
- if ok && addrInfo != nil {
- return addrInfo.(*addressInfo)
- }
- return nil
-}
-
-func (c *ConnectionManager) OnConnectEvent(event *events.SocketConnectEvent,
pair *ip.SocketPair) {
- // only adding the local ip port when remote is local address
- switch c.isLocalTarget(pair) {
- case addressProcessTypeUnknown:
- log.Debugf("the target address is not local, so no needs to
save the cache. "+
- "address: %s:%d, pid: %d", pair.DestIP, pair.DestPort,
event.PID)
- case addressProcessTypeLocal:
- switch pair.Role {
- case enums.ConnectionRoleClient:
- // if current is client, so the local port should be
unique
- c.savingTheAddress(pair.SrcIP, pair.SrcPort, true,
event.PID)
- case enums.ConnectionRoleServer:
- // if current is server, so the remote port should be
unique
- c.savingTheAddress(pair.DestIP, pair.DestPort, false,
event.PID)
- case enums.ConnectionRoleUnknown:
- log.Debugf("the target address local but unknown role,
so no needs to save the cache. socket: [%s], pid: %d",
- pair, event.PID)
- }
- case addressProcessTypeKubernetes:
- switch pair.Role {
- case enums.ConnectionRoleClient:
- // if current is client, so the local port should be
unique
- c.savingTheAddress(pair.SrcIP, pair.SrcPort, true,
event.PID)
- case enums.ConnectionRoleServer:
- // if current is server, so the remote port should be
unique
- c.savingTheAddress(pair.DestIP, pair.DestPort, false,
event.PID)
- case enums.ConnectionRoleUnknown:
- log.Debugf("the target address kubernetes but unknown
role, so no needs to save the cache. socket: [%s], pid: %d",
- pair, event.PID)
- }
- }
-}
-
-func (c *ConnectionManager) isLocalTarget(pair *ip.SocketPair)
addressProcessType {
- destIP := pair.DestIP
- if tools.IsLocalHostAddress(destIP) {
- return addressProcessTypeLocal
- }
- if _, exist := c.localIPWithPid[destIP]; exist {
- return addressProcessTypeKubernetes
- }
- return addressProcessTypeUnknown
}
func (c *ConnectionManager) AddNewProcess(pid int32, entities
[]api.ProcessInterface) {
@@ -706,20 +596,6 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished()
{
}
})
- deleteFromUnfinished := make([]string, 0)
- for conKey, processorFinished := range c.allUnfinishedConnections {
- if *processorFinished {
- deletableConnections[conKey] = true
- deleteFromUnfinished = append(deleteFromUnfinished,
conKey)
- } else {
- // if the processor not finished, then ignore it from
deletable connections
- delete(deletableConnections, conKey)
- }
- }
- for _, key := range deleteFromUnfinished {
- delete(c.allUnfinishedConnections, key)
- }
-
for key := range deletableConnections {
log.Debugf("deleting the connection in manager: %s", key)
c.connections.Remove(key)
diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go
index 0d60012..34baee3 100644
--- a/pkg/accesslog/common/filter.go
+++ b/pkg/accesslog/common/filter.go
@@ -18,6 +18,7 @@
package common
import (
+ "os"
"strings"
"github.com/apache/skywalking-rover/pkg/process/api"
@@ -46,7 +47,11 @@ func NewStaticMonitorFilter(namespaces, clusters []string)
*StaticMonitorFilter
}
func (s *StaticMonitorFilter) ShouldIncludeProcesses(processes
[]api.ProcessInterface) (res []api.ProcessInterface) {
+ var selfPid = os.Getpid()
for _, entity := range processes {
+ if int(entity.Pid()) == selfPid {
+ continue
+ }
if entity.DetectType() != api.Kubernetes { // for now, we only
have the kubernetes detected processes
continue
}
diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go
index acd0d61..d4aaa1b 100644
--- a/pkg/accesslog/events/close.go
+++ b/pkg/accesslog/events/close.go
@@ -20,7 +20,7 @@ package events
import (
"time"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
@@ -35,7 +35,7 @@ type SocketCloseEvent struct {
Success uint32
}
-func (c *SocketCloseEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketCloseEvent) ReadFrom(r btf.Reader) {
c.ConnectionID = r.ReadUint64()
c.RandomID = r.ReadUint64()
c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/connect.go b/pkg/accesslog/events/connect.go
index 9433168..daf6458 100644
--- a/pkg/accesslog/events/connect.go
+++ b/pkg/accesslog/events/connect.go
@@ -20,7 +20,7 @@ package events
import (
"time"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
@@ -47,7 +47,7 @@ type SocketConnectEvent struct {
ConnTrackUpstreamPort uint32
}
-func (c *SocketConnectEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketConnectEvent) ReadFrom(r btf.Reader) {
c.ConID = r.ReadUint64()
c.RandomID = r.ReadUint64()
c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 6ecfd2f..85bbd76 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -20,7 +20,7 @@ package events
import (
"fmt"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/enums"
)
@@ -41,7 +41,11 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}
-func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
+func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte {
+ return &s.Buffer
+}
+
+func (s *SocketDataUploadEvent) ReadFrom(r btf.Reader) {
s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8())
s.HaveReduce = r.ReadUint8()
s.Direction0 = enums.SocketDataDirection(r.ReadUint8())
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index a5ac46c..5beb5df 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -20,7 +20,7 @@ package events
import (
"time"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
@@ -73,7 +73,7 @@ type SocketDetailEvent struct {
SSL uint8
}
-func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
+func (d *SocketDetailEvent) ReadFrom(r btf.Reader) {
d.ConnectionID = r.ReadUint64()
d.RandomID = r.ReadUint64()
d.DataID0 = r.ReadUint64()
diff --git a/pkg/accesslog/events/events_test.go
b/pkg/accesslog/events/events_test.go
index 1da9f3e..f2c11bd 100644
--- a/pkg/accesslog/events/events_test.go
+++ b/pkg/accesslog/events/events_test.go
@@ -25,16 +25,16 @@ import (
"strings"
"testing"
- "github.com/stretchr/testify/assert"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+ "github.com/stretchr/testify/assert"
)
// nolint
func TestBufferRead(t *testing.T) {
tests := []struct {
hex string
- create func() reader.EventReader
+ create func() btf.EventReader
}{
{
hex: `
@@ -46,7 +46,7 @@ func TestBufferRead(t *testing.T) {
00 00 00 00 00 00 00 00 00 00 ff ff 7f 00 00 01
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketConnectEvent{}
},
},
@@ -59,7 +59,7 @@ func TestBufferRead(t *testing.T) {
23 4a 01 00 bb 49 01 00 00 00 00 00 e4 01 00 00
24 21 00 00 01 00 00 00 39 d6 00 00 00 00 00 00
03 00 00 00 00 00 00 00 02 02 00 02 02 09 01 00`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketDetailEvent{}
},
},
@@ -69,7 +69,7 @@ func TestBufferRead(t *testing.T) {
b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00
7a a1 00 00 04 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketCloseEvent{}
},
},
@@ -209,7 +209,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00
`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketDataUploadEvent{}
},
},
@@ -226,7 +226,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
}
binaryRead := test.create()
selfRead := test.create()
- bufReader := reader.NewReader(rawData)
+ bufReader := btf.NewReader(rawData)
selfRead.ReadFrom(bufReader)
if err := bufReader.HasError(); err != nil {
t.Fatalf("reading by self parsing error: %v",
err)
diff --git a/pkg/accesslog/events/ztunnel.go b/pkg/accesslog/events/ztunnel.go
index abd2f1c..81572c3 100644
--- a/pkg/accesslog/events/ztunnel.go
+++ b/pkg/accesslog/events/ztunnel.go
@@ -17,7 +17,9 @@
package events
-import "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
+)
type ZTunnelSocketMappingEvent struct {
OriginalSrcIP uint32
@@ -30,7 +32,7 @@ type ZTunnelSocketMappingEvent struct {
Pad1 uint32
}
-func (z *ZTunnelSocketMappingEvent) ReadFrom(r *reader.Reader) {
+func (z *ZTunnelSocketMappingEvent) ReadFrom(r btf.Reader) {
z.OriginalSrcIP = r.ReadUint32()
z.OriginalDestIP = r.ReadUint32()
z.OriginalSrcPort = r.ReadUint16()
diff --git a/pkg/process/api.go b/pkg/process/api.go
index b84453b..ca11907 100644
--- a/pkg/process/api.go
+++ b/pkg/process/api.go
@@ -37,4 +37,6 @@ type Operator interface {
type K8sOperator interface {
// NodeName get the node name
NodeName() string
+ // IsPodIP check the ip is pod ip
+ IsPodIP(ip string) (bool, error)
}
diff --git a/pkg/process/finders/kubernetes/finder.go
b/pkg/process/finders/kubernetes/finder.go
index 03a0ba7..58282ea 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -21,10 +21,12 @@ import (
"bufio"
"context"
"fmt"
+ "hash/fnv"
"os"
"regexp"
"strconv"
"strings"
+ "sync"
"time"
lru "github.com/hashicorp/golang-lru"
@@ -36,6 +38,10 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/util/cache"
+ "k8s.io/apimachinery/pkg/util/rand"
+
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@@ -53,6 +59,8 @@ var log = logger.GetLogger("process", "finder", "kubernetes")
var (
kubepodsRegex =
regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`)
openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`)
+ ipExistTimeout = time.Minute * 10
+ ipSearchParallel = 10
)
type ProcessFinder struct {
@@ -73,6 +81,10 @@ type ProcessFinder struct {
// runtime config
namespaces []string
+
+ // for IsPodIP check
+ podIPChecker *cache.Expiring
+ podIPMutexes map[int]*sync.Mutex
}
func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig,
manager base.ProcessManager) error {
@@ -89,11 +101,16 @@ func (f *ProcessFinder) Init(ctx context.Context, conf
base.FinderBaseConfig, ma
f.stopChan = make(chan struct{}, 1)
f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
f.manager = manager
- cache, err := lru.New(5000)
+ f.podIPChecker = cache.NewExpiring()
+ f.podIPMutexes = make(map[int]*sync.Mutex)
+ for i := 0; i < ipSearchParallel; i++ {
+ f.podIPMutexes[i] = &sync.Mutex{}
+ }
+ processCache, err := lru.New(5000)
if err != nil {
return err
}
- f.processCache = cache
+ f.processCache = processCache
return nil
}
@@ -277,7 +294,7 @@ func (f *ProcessFinder) getProcessCGroup(pid int32)
([]string, error) {
}
defer cgroupFile.Close()
- cache := make(map[string]bool)
+ cgroups := make(map[string]bool)
scanner := bufio.NewScanner(cgroupFile)
for scanner.Scan() {
infos := strings.Split(scanner.Text(), ":")
@@ -295,14 +312,14 @@ func (f *ProcessFinder) getProcessCGroup(pid int32)
([]string, error) {
if openShiftPod :=
openShiftPodsRegex.FindStringSubmatch(path); len(openShiftPod) >= 1 {
path = openShiftPod[1]
}
- cache[path] = true
+ cgroups[path] = true
}
}
- if len(cache) == 0 {
+ if len(cgroups) == 0 {
return nil, fmt.Errorf("no cgroups")
}
result := make([]string, 0)
- for k := range cache {
+ for k := range cgroups {
result = append(result, k)
}
return result, nil
@@ -400,3 +417,32 @@ func (f *ProcessFinder) ShouldMonitor(pid int32) bool {
f.manager.AddDetectedProcess(processes)
return true
}
+
+func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
+ val, exist := f.podIPChecker.Get(ip)
+ if exist {
+ return val.(bool), nil
+ }
+
+ // parallels the search
+ h := fnv.New32a()
+ h.Write([]byte(ip))
+ sum32 := int(h.Sum32())
+ mutex := f.podIPMutexes[sum32%ipSearchParallel]
+ mutex.Lock()
+ defer mutex.Unlock()
+
+ pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx,
metav1.ListOptions{
+ FieldSelector: fields.OneTermEqualSelector("status.podIP",
ip).String(),
+ Limit: 1,
+ })
+ if err != nil {
+ return false, err
+ }
+ found := len(pods.Items) > 0
+
+ // the timeout added a random value to avoid the cache avalanche
+ addedTime := time.Second * time.Duration(rand.IntnRange(10, 60))
+ f.podIPChecker.Set(ip, found, ipExistTimeout+addedTime)
+ return found, nil
+}
diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go
index dbe99c0..a252416 100644
--- a/pkg/process/finders/manager.go
+++ b/pkg/process/finders/manager.go
@@ -113,6 +113,15 @@ func (m *ProcessManager) Shutdown() error {
return result
}
+func (m *ProcessManager) Finder(finderType api.ProcessDetectType)
(base.ProcessFinder, bool) {
+ for _, finder := range m.finders {
+ if finder.DetectType() == finderType {
+ return finder, true
+ }
+ }
+ return nil, false
+}
+
func (p *ProcessManagerWithFinder) GetModuleManager() *module.Manager {
return p.moduleManager
}
diff --git a/pkg/process/module.go b/pkg/process/module.go
index 648b8f6..95063b5 100644
--- a/pkg/process/module.go
+++ b/pkg/process/module.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/process/finders"
+ "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
)
const ModuleName = "process_discovery"
@@ -105,3 +106,11 @@ func (m *Module) ShouldMonitor(pid int32) bool {
func (m *Module) NodeName() string {
return m.config.Kubernetes.NodeName
}
+
+func (m *Module) IsPodIP(ip string) (bool, error) {
+ k8sFinder, exist := m.manager.Finder(api.Kubernetes)
+ if !exist {
+ return false, nil
+ }
+ return k8sFinder.(*kubernetes.ProcessFinder).IsPodIP(ip)
+}
diff --git a/pkg/profiling/task/network/analyze/events/data.go
b/pkg/profiling/task/network/analyze/events/data.go
index bc451fc..3c59378 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -40,6 +40,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}
+func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte {
+ return &s.Buffer
+}
+
func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
return s.Protocol0
}
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1c01084..359e466 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -20,6 +20,8 @@ package layer7
import (
"context"
+ "github.com/apache/skywalking-rover/pkg/tools/buffer"
+
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
analyzeBase
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
@@ -37,7 +39,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
// socket buffer data
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
- return &analyzeBase.SocketDataUploadEvent{}
+ return &analyzeBase.SocketDataUploadEvent{Buffer:
*buffer.BorrowNewBuffer()}
}, func(data interface{}) int {
return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
})
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..3abb43b 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -30,7 +30,6 @@ import (
"golang.org/x/arch/arm64/arm64asm"
"golang.org/x/arch/x86/x86asm"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/process"
@@ -167,7 +166,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader
RingBufferReader, data
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer,
parallels int, dataSupplier func() interface{}) {
- rd, err := newQueueReader(emap, perCPUBuffer)
+ rd, err := perf.NewReader(emap, perCPUBuffer)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring
buffer error: %v", err))
return
@@ -178,42 +177,53 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap
*ebpf.Map, bufReader RingBuff
}
m.closers = append(m.closers, rd)
+ recordBuilder := newPerfRecordBuilder(dataSupplier())
for i := 0; i < parallels; i++ {
- m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+ m.asyncReadEvent(rd, emap, recordBuilder, dataSupplier,
bufReader)
}
}
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, recordPool
*perfRecordBuilder,
+ dataSupplier func() interface{}, bufReader RingBufferReader) {
go func() {
for {
- sample, err := rd.Read()
+ record := recordPool.GetRecord()
+ err := rd.ReadInto(record)
if err != nil {
+ recordPool.PutRecord(record)
if errors.Is(err, perf.ErrClosed) {
return
}
log.Warnf("read from %s ringbuffer error: %v",
emap.String(), err)
continue
}
- if len(sample) == 0 {
+
+ if record.LostSamples != 0 {
+ log.Warnf("perf event queue(%s) full, dropped
%d samples", emap.String(), record.LostSamples)
+ recordPool.PutRecord(record)
continue
}
data := dataSupplier()
- if r, ok := data.(reader.EventReader); ok {
- sampleReader := reader.NewReader(sample)
+ if r, ok := data.(EventReader); ok {
+ sampleReader := NewReader(record.RawSample)
r.ReadFrom(sampleReader)
if readErr := sampleReader.HasError(); readErr
!= nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+ recordPool.PutRecord(record)
continue
}
} else {
- if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ if err :=
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err
!= nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+ recordPool.PutRecord(record)
continue
}
}
bufReader(data)
+
+ recordPool.PutRecord(record)
}
}()
}
@@ -402,3 +412,34 @@ func (m *Linker) Close() error {
})
return err
}
+
+type perfRecordBuilder struct {
+ dataSize int
+ pool sync.Pool
+}
+
+func newPerfRecordBuilder(data interface{}) *perfRecordBuilder {
+ // added 8 bytes means fix some event not aligned
+ var size = binary.Size(data) + 8
+ if r, ok := data.(EventReader); ok {
+ reader := newSizeCalcReader()
+ r.ReadFrom(reader)
+ size = reader.Size() + 8
+ }
+
+ builder := &perfRecordBuilder{
+ dataSize: size,
+ }
+ builder.pool.New = func() any {
+ return &perf.Record{RawSample: make([]byte, 0,
builder.dataSize)}
+ }
+ return builder
+}
+
+func (p *perfRecordBuilder) GetRecord() *perf.Record {
+ return p.pool.Get().(*perf.Record)
+}
+
+func (p *perfRecordBuilder) PutRecord(r *perf.Record) {
+ p.pool.Put(r)
+}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index fdd8754..eff29c8 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,90 +19,16 @@ package btf
import (
"context"
- "fmt"
"sync"
"time"
"github.com/cilium/ebpf"
- "github.com/cilium/ebpf/perf"
- "github.com/cilium/ebpf/ringbuf"
)
// queueChannelReducingCountCheckInterval is the interval to check the queue
channel reducing count
// if the reducing count is almost full, then added a warning log
const queueChannelReducingCountCheckInterval = time.Second * 5
-type queueReader interface {
- Read() ([]byte, error)
- Close() error
-}
-
-func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
- switch emap.Type() {
- case ebpf.RingBuf:
- return newRingBufReader(emap)
- case ebpf.PerfEventArray:
- return newPerfQueueReader(emap, perCPUBuffer)
- }
- return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
-}
-
-type perfQueueReader struct {
- name string
- reader *perf.Reader
-}
-
-func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader,
error) {
- reader, err := perf.NewReader(emap, perCPUBuffer)
- if err != nil {
- return nil, err
- }
- return &perfQueueReader{reader: reader, name: emap.String()}, nil
-}
-
-func (p *perfQueueReader) Read() ([]byte, error) {
- read, err := p.reader.Read()
- if err != nil {
- return nil, err
- }
-
- if read.LostSamples != 0 {
- log.Warnf("perf event queue(%s) full, dropped %d samples",
p.name, read.LostSamples)
- return nil, nil
- }
-
- return read.RawSample, nil
-}
-
-func (p *perfQueueReader) Close() error {
- return p.reader.Close()
-}
-
-type ringBufReader struct {
- reader *ringbuf.Reader
- name string
-}
-
-func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
- reader, err := ringbuf.NewReader(emap)
- if err != nil {
- return nil, err
- }
- return &ringBufReader{reader: reader, name: emap.String()}, nil
-}
-
-func (r *ringBufReader) Read() ([]byte, error) {
- read, err := r.reader.Read()
- if err != nil {
- return nil, err
- }
- return read.RawSample, nil
-}
-
-func (r *ringBufReader) Close() error {
- return r.reader.Close()
-}
-
type PartitionContext interface {
Start(ctx context.Context)
Consume(data interface{})
diff --git a/pkg/tools/btf/reader/reader.go b/pkg/tools/btf/reader.go
similarity index 62%
rename from pkg/tools/btf/reader/reader.go
rename to pkg/tools/btf/reader.go
index d2e980e..5cae4d5 100644
--- a/pkg/tools/btf/reader/reader.go
+++ b/pkg/tools/btf/reader.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package reader
+package btf
import (
"encoding/binary"
@@ -25,11 +25,20 @@ import (
// EventReader read the sample data by self, instant of binary.Read
type EventReader interface {
// ReadFrom read buffer data
- ReadFrom(reader *Reader)
+ ReadFrom(reader Reader)
}
-// Reader buffer sample reader
-type Reader struct {
+type Reader interface {
+ HasError() error
+ ReadUint64() uint64
+ ReadUint32() uint32
+ ReadUint16() uint16
+ ReadUint8() uint8
+ ReadUint8Array(a []uint8, size int)
+}
+
+// BytesReader buffer sample reader
+type BytesReader struct {
Sample []byte
CurrentOffset int
sampleLen int
@@ -37,8 +46,8 @@ type Reader struct {
}
// NewReader create a reader from BPF buffer
-func NewReader(sample []byte) *Reader {
- return &Reader{
+func NewReader(sample []byte) Reader {
+ return &BytesReader{
Sample: sample,
CurrentOffset: 0,
sampleLen: len(sample),
@@ -46,11 +55,11 @@ func NewReader(sample []byte) *Reader {
}
// HasError is there have error when reading buffer
-func (r *Reader) HasError() error {
+func (r *BytesReader) HasError() error {
return r.err
}
-func (r *Reader) ReadUint64() uint64 {
+func (r *BytesReader) ReadUint64() uint64 {
bytes, err := r.read(8)
if err != nil {
return 0
@@ -58,7 +67,7 @@ func (r *Reader) ReadUint64() uint64 {
return binary.LittleEndian.Uint64(bytes)
}
-func (r *Reader) ReadUint32() uint32 {
+func (r *BytesReader) ReadUint32() uint32 {
bytes, err := r.read(4)
if err != nil {
return 0
@@ -66,7 +75,7 @@ func (r *Reader) ReadUint32() uint32 {
return binary.LittleEndian.Uint32(bytes)
}
-func (r *Reader) ReadUint16() uint16 {
+func (r *BytesReader) ReadUint16() uint16 {
bytes, err := r.read(2)
if err != nil {
return 0
@@ -74,7 +83,7 @@ func (r *Reader) ReadUint16() uint16 {
return binary.LittleEndian.Uint16(bytes)
}
-func (r *Reader) ReadUint8() uint8 {
+func (r *BytesReader) ReadUint8() uint8 {
bytes, err := r.read(1)
if err != nil {
return 0
@@ -82,7 +91,7 @@ func (r *Reader) ReadUint8() uint8 {
return bytes[0]
}
-func (r *Reader) ReadUint8Array(a []uint8, size int) {
+func (r *BytesReader) ReadUint8Array(a []uint8, size int) {
read, err := r.read(size)
if err != nil {
return
@@ -90,7 +99,7 @@ func (r *Reader) ReadUint8Array(a []uint8, size int) {
copy(a, read)
}
-func (r *Reader) read(size int) ([]byte, error) {
+func (r *BytesReader) read(size int) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
@@ -103,3 +112,43 @@ func (r *Reader) read(size int) ([]byte, error) {
r.CurrentOffset += size
return bytes, nil
}
+
+type sizeCalcReader struct {
+ size int
+}
+
+func newSizeCalcReader() *sizeCalcReader {
+ return &sizeCalcReader{}
+}
+
+func (r *sizeCalcReader) HasError() error {
+ return nil
+}
+
+func (r *sizeCalcReader) ReadUint64() uint64 {
+ r.size += 8
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint32() uint32 {
+ r.size += 4
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint16() uint16 {
+ r.size += 2
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint8() uint8 {
+ r.size++
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint8Array(a []uint8, size int) {
+ r.size += size
+}
+
+func (r *sizeCalcReader) Size() int {
+ return r.size
+}
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 368e1f6..33f6d1c 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -37,8 +37,18 @@ var (
emptyList = list.New()
log = logger.GetLogger("tools", "buffer")
+
+ PooledBuffer = sync.Pool{
+ New: func() any {
+ return &[2048]byte{}
+ },
+ }
)
+func BorrowNewBuffer() *[2048]byte {
+ return PooledBuffer.Get().(*[2048]byte)
+}
+
type SocketDataBuffer interface {
// Protocol of the buffer
Protocol() enums.ConnectionProtocol
@@ -71,6 +81,8 @@ type SocketDataBuffer interface {
StartTime() uint64
// EndTime the data end timestamp
EndTime() uint64
+
+ ReleaseBuffer() *[2048]byte
}
type SocketDataDetail interface {
@@ -259,60 +271,19 @@ func (r *Buffer) Clean() {
// nolint
func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
dataEvents := list.New()
- detailEvents := list.New()
- var firstDetailElement *list.Element
- var lastBufferDataID = start.DataID()
for nextElement := start.element; nextElement != end.element;
nextElement = nextElement.Next() {
if nextElement == nil || nextElement.Value == nil {
break
}
currentBuffer := nextElement.Value.(SocketDataBuffer)
- // found first matches detail event
- if detailEvents.Len() == 0 || firstDetailElement == nil {
- for e := r.detailEvents.Front(); e != nil; e = e.Next()
{
- if e.Value == nil {
- continue
- }
- if e.Value.(SocketDataDetail).DataID() >=
currentBuffer.DataID() {
- detailEvents.PushBack(e.Value)
- firstDetailElement = e
- break
- }
- }
- }
dataEvents.PushBack(currentBuffer)
- lastBufferDataID = currentBuffer.DataID()
}
lastBuffer := end.element.Value.(SocketDataBuffer)
dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer:
lastBuffer, Size: end.bufIndex})
- // if the first detail element been found, append the details until the
last buffer data id
- var lastBufferID = lastBufferDataID
- if lastBuffer != nil {
- lastBufferID = lastBuffer.DataID()
- }
- if firstDetailElement == nil && r.detailEvents != nil {
- for e := r.detailEvents.Front(); e != nil; e = e.Next() {
- if e.Value != nil &&
e.Value.(SocketDataDetail).DataID() == lastBufferID {
- detailEvents.PushBack(e.Value)
- break
- }
- }
- } else if firstDetailElement != nil &&
firstDetailElement.Value.(SocketDataDetail).DataID() != lastBufferID {
- for tmp := firstDetailElement.Next(); tmp != nil; tmp =
tmp.Next() {
- if tmp.Value == nil {
- continue
- }
- if tmp.Value.(SocketDataDetail).DataID() > lastBufferID
{
- break
- }
- detailEvents.PushBack(tmp.Value)
- }
- }
-
return &Buffer{
dataEvents: dataEvents,
- detailEvents: detailEvents,
+ detailEvents: emptyList,
validated: validated,
head: &Position{element: dataEvents.Front(),
bufIndex: start.bufIndex},
current: &Position{element: dataEvents.Front(),
bufIndex: start.bufIndex},
@@ -347,6 +318,9 @@ func (r *Buffer) BuildDetails() *list.List {
}
for e := r.originalBuffer.detailEvents.Front(); e != nil; e =
e.Next() {
+ if e.Value == nil {
+ continue
+ }
if e.Value.(SocketDataDetail).DataID() >= fromDataID &&
e.Value.(SocketDataDetail).DataID() <= endDataID {
events.PushBack(e.Value)
}
@@ -358,7 +332,7 @@ func (r *Buffer) BuildDetails() *list.List {
dataIDList = append(dataIDList,
e.Value.(SocketDataDetail).DataID())
}
}
- log.Debugf("cannot found details from original buffer,
from data id: %d, end data id: %d, "+
+ log.Infof("cannot found details from original buffer,
from data id: %d, end data id: %d, "+
"ref: %p, existing details data id list: %v",
fromDataID, endDataID, r.originalBuffer, dataIDList)
}
@@ -409,6 +383,20 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
return r.dataEvents.Back().Value.(SocketDataBuffer)
}
+func (r *Buffer) TotalBuffer() []SocketDataBuffer {
+ if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
+ return nil
+ }
+ result := make([]SocketDataBuffer, 0, r.dataEvents.Len())
+ for e := r.dataEvents.Front(); e != nil; e = e.Next() {
+ if e.Value == nil {
+ continue
+ }
+ result = append(result, e.Value.(SocketDataBuffer))
+ }
+ return result
+}
+
// DetectNotSendingLastPosition detect the buffer contains not sending data:
the BPF limited socket data count
func (r *Buffer) DetectNotSendingLastPosition() *Position {
if r == nil || r.dataEvents.Len() == 0 {
@@ -642,7 +630,7 @@ func (r *Buffer) PrepareForReading() bool {
if r.shouldResetPosition {
r.ResetForLoopReading()
r.shouldResetPosition = false
- return false
+ return r.PrepareForReading()
}
if r.head == nil || r.head.element == nil {
// read in the first element
@@ -732,6 +720,11 @@ func (r *Buffer) removeElement0(element *list.Element)
*list.Element {
}
result := element.Next()
r.dataEvents.Remove(element)
+ if element.Value != nil {
+ if b, ok := element.Value.(SocketDataBuffer); ok && b != nil {
+ PooledBuffer.Put(b.ReleaseBuffer())
+ }
+ }
return result
}
@@ -819,10 +812,14 @@ func (r *Buffer) DeleteExpireEvents(expireDuration
time.Duration) int {
expireTime := time.Now().Add(-expireDuration)
// data event queue
count := r.deleteEventsWithJudgement(r.dataEvents, func(element
*list.Element) bool {
+ if element.Value == nil {
+ return true
+ }
buffer := element.Value.(SocketDataBuffer)
startTime := host.Time(buffer.StartTime())
if expireTime.After(startTime) {
r.latestExpiredDataID = buffer.DataID()
+ PooledBuffer.Put(buffer.ReleaseBuffer())
return true
}
return false
diff --git a/pkg/tools/ip.go b/pkg/tools/ip.go
index 24305b9..b3a34fa 100644
--- a/pkg/tools/ip.go
+++ b/pkg/tools/ip.go
@@ -38,7 +38,7 @@ func DefaultHostIPAddress() string {
// HostIPAddressV4 found the IPV4 address from appoint net interface name
func HostIPAddressV4(name string) string {
- address := host.ipAddresses[name]
+ address := host.ipAddressesByName[name]
if address == nil {
return ""
}
@@ -47,7 +47,7 @@ func HostIPAddressV4(name string) string {
// HostIPAddressV6 found the IPV6 address from appoint net interface name
func HostIPAddressV6(name string) string {
- address := host.ipAddresses[name]
+ address := host.ipAddressesByName[name]
if address == nil {
return ""
}
@@ -56,10 +56,8 @@ func HostIPAddressV6(name string) string {
// IsLocalHostAddress is the address from local
func IsLocalHostAddress(address string) bool {
- for _, h := range host.ipAddresses {
- if h.ipV4 == address || h.ipV6 == address {
- return true
- }
+ if host.ipAddressesByIP[address] {
+ return true
}
return address == "0.0.0.0"
}
@@ -73,8 +71,9 @@ type hostInfo struct {
// hostname
name string
// ip address
- ipAddresses map[string]*hostIPAddress
- defaultIPAddr string
+ ipAddressesByName map[string]*hostIPAddress
+ ipAddressesByIP map[string]bool
+ defaultIPAddr string
}
type hostIPAddress struct {
@@ -83,7 +82,7 @@ type hostIPAddress struct {
}
func queryHostInfo() *hostInfo {
- addresses, def, err := localIPAddress0()
+ addressesByName, def, err := localIPAddress0()
if err != nil {
panic(err)
}
@@ -91,7 +90,12 @@ func queryHostInfo() *hostInfo {
if err != nil {
panic(err)
}
- return &hostInfo{name: name, ipAddresses: addresses, defaultIPAddr: def}
+ addressesByIP := make(map[string]bool)
+ for _, addr := range addressesByName {
+ addressesByIP[addr.ipV4] = true
+ addressesByIP[addr.ipV6] = true
+ }
+ return &hostInfo{name: name, ipAddressesByName: addressesByName,
ipAddressesByIP: addressesByIP, defaultIPAddr: def}
}
func hostname0() (string, error) {
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 72dc1b9..3e3f934 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -18,14 +18,15 @@
package ip
import (
+ "fmt"
"net"
"syscall"
"github.com/florianl/go-conntrack"
- "github.com/apache/skywalking-rover/pkg/logger"
-
"golang.org/x/sys/unix"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
)
var log = logger.GetLogger("tools", "ip")
@@ -47,10 +48,11 @@ func NewConnTrack() (*ConnTrack, error) {
if err != nil {
return nil, err
}
+
return &ConnTrack{tracker: nfct}, nil
}
-func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) bool {
+func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error {
family := conntrack.IPv4
if addr.Family == unix.AF_INET6 {
family = conntrack.IPv6
@@ -64,17 +66,18 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair)
bool {
session, e := c.tracker.Get(conntrack.Conntrack, family,
conntrack.Con{Origin: tuple})
if e != nil {
// try to get the reply session, if the info not exists
or from accept events, have error is normal
- log.Debugf("cannot get the conntrack session, type: %s,
family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v", info.name,
+ return fmt.Errorf("cannot get the conntrack session,
type: %s, family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v",
info.name,
family, tuple.Src, *tuple.Proto.SrcPort,
tuple.Dst, *tuple.Proto.DstPort, e)
- continue
}
if res := c.filterValidateReply(session, tuple); res != nil {
addr.DestIP = res.Src.String()
- return true
+ addr.NeedConnTrack = false
+ log.Debugf("update real peer address from conntrack:
%s:%d", addr.DestIP, addr.DestPort)
+ return nil
}
}
- return false
+ return nil
}
func (c *ConnTrack) parseSocketToTuple(addr *SocketPair) *conntrack.IPTuple {
diff --git a/pkg/tools/ip/tcpresolver.go b/pkg/tools/ip/tcpresolver.go
index a07c216..9f09309 100644
--- a/pkg/tools/ip/tcpresolver.go
+++ b/pkg/tools/ip/tcpresolver.go
@@ -34,6 +34,8 @@ type SocketPair struct {
SrcPort uint16
DestIP string
DestPort uint16
+
+ NeedConnTrack bool
}
func (s *SocketPair) IsValid() bool {