This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit 7d21644a1cf1b3665129bf6a48d869552e81b6cf
Author: mrproliu <[email protected]>
AuthorDate: Fri Dec 27 13:07:08 2024 +0800

    Reduce handle connect event time in the access log module
---
 CHANGES.md                                         |   1 +
 bpf/accesslog/common/connection.h                  |   3 +-
 bpf/accesslog/common/data_args.h                   |   6 +-
 bpf/accesslog/syscalls/connect_conntrack.c         |   5 +-
 bpf/accesslog/syscalls/connect_conntrack.h         |   2 +-
 go.mod                                             |   4 +-
 go.sum                                             |  11 +-
 pkg/accesslog/collector/connection.go              |  79 ++++----
 pkg/accesslog/collector/protocols/connection.go    |   1 -
 pkg/accesslog/collector/protocols/queue.go         |  33 +---
 pkg/accesslog/common/connection.go                 | 214 +++++----------------
 pkg/accesslog/common/filter.go                     |   5 +
 pkg/accesslog/events/close.go                      |   4 +-
 pkg/accesslog/events/connect.go                    |   4 +-
 pkg/accesslog/events/data.go                       |   8 +-
 pkg/accesslog/events/detail.go                     |   4 +-
 pkg/accesslog/events/events_test.go                |  16 +-
 pkg/accesslog/events/ztunnel.go                    |   6 +-
 pkg/process/api.go                                 |   2 +
 pkg/process/finders/kubernetes/finder.go           |  58 +++++-
 pkg/process/finders/manager.go                     |   9 +
 pkg/process/module.go                              |   9 +
 pkg/profiling/task/network/analyze/events/data.go  |   4 +
 .../task/network/analyze/layer7/events.go          |   4 +-
 pkg/tools/btf/linker.go                            |  63 ++++--
 pkg/tools/btf/queue.go                             |  74 -------
 pkg/tools/btf/{reader => }/reader.go               |  75 ++++++--
 pkg/tools/buffer/buffer.go                         |  85 ++++----
 pkg/tools/ip.go                                    |  24 ++-
 pkg/tools/ip/conntrack.go                          |  17 +-
 pkg/tools/ip/tcpresolver.go                        |   2 +
 31 files changed, 399 insertions(+), 433 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3d54791..16fc6fd 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -21,6 +21,7 @@ Release Notes.
 * Add warning log when the event queue almost full in the access log module.
 * Reduce unessential `conntrack` query when detect new connection.
 * Reduce CPU and memory usage in the access log module.
+* Reduce handle connection event time in the access log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/common/connection.h 
b/bpf/accesslog/common/connection.h
index 880826d..834737a 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -27,7 +27,7 @@
 // syscall:connect
 struct connect_args_t {
     __u32 fd;
-    __u32 fix;
+    __u32 has_remote;
     struct sockaddr* addr;
     struct sock *sock;
     __u64 start_nacs;
@@ -305,6 +305,7 @@ static __inline void submit_connection_when_not_exists(void 
*ctx, __u64 id, stru
 
 static __inline void notify_close_connection(void* ctx, __u64 conid, struct 
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
     bpf_map_delete_elem(&socket_data_last_id_map, &conid);
+    bpf_map_delete_elem(&socket_data_id_generate_map, &conid);
     struct socket_close_event_t *close_event;
     close_event = rover_reserve_buf(&socket_close_event_queue, 
sizeof(*close_event));
     if (close_event == NULL) {
diff --git a/bpf/accesslog/common/data_args.h b/bpf/accesslog/common/data_args.h
index 2d76062..33eeea6 100644
--- a/bpf/accesslog/common/data_args.h
+++ b/bpf/accesslog/common/data_args.h
@@ -66,14 +66,14 @@ struct sock_data_args_t {
 };
 struct {
        __uint(type, BPF_MAP_TYPE_HASH);
-       __uint(max_entries, 10000);
+       __uint(max_entries, 65535);
        __type(key, __u64);
        __type(value, struct sock_data_args_t);
 } socket_data_args SEC(".maps");
 
 struct {
-       __uint(type, BPF_MAP_TYPE_HASH);
-       __uint(max_entries, 10000);
+       __uint(type, BPF_MAP_TYPE_LRU_HASH);
+       __uint(max_entries, 65535);
        __type(key, __u64);
        __type(value, __u64);
 } socket_data_id_generate_map SEC(".maps");
diff --git a/bpf/accesslog/syscalls/connect_conntrack.c 
b/bpf/accesslog/syscalls/connect_conntrack.c
index 4efac48..8f1f5cb 100644
--- a/bpf/accesslog/syscalls/connect_conntrack.c
+++ b/bpf/accesslog/syscalls/connect_conntrack.c
@@ -93,7 +93,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, 
struct nf_conn *ct
     }
 
     // already contains the remote address
-    if (&(connect_args->remote) != NULL) {
+    if (connect_args->has_remote && &(connect_args->remote) != NULL) {
         return 0;
     }
 
@@ -126,6 +126,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* 
ctx, struct nf_conn *ct
     remote.ipl = reply_conn.saddr_l;
     remote.port = reply_conn.sport;
     connect_args->remote = remote;
+    connect_args->has_remote = 1;
     bpf_map_update_elem(&conecting_args, &id, connect_args, 0);
 
     return 0;
@@ -144,4 +145,4 @@ int nf_confirm(struct pt_regs* ctx) {
 SEC("kprobe/ctnetlink_fill_info")
 int nf_ctnetlink_fill_info(struct pt_regs* ctx) {
     return nf_conn_aware(ctx, (struct nf_conn*)PT_REGS_PARM5(ctx));
-}
+}
\ No newline at end of file
diff --git a/bpf/accesslog/syscalls/connect_conntrack.h 
b/bpf/accesslog/syscalls/connect_conntrack.h
index 0e67b9c..06e5072 100644
--- a/bpf/accesslog/syscalls/connect_conntrack.h
+++ b/bpf/accesslog/syscalls/connect_conntrack.h
@@ -96,4 +96,4 @@ struct nf_conn {
        struct nf_conntrack_tuple_hash tuplehash[IP_CT_DIR_MAX];
        long unsigned int status;
        __u32 mark;
-} __attribute__((preserve_access_index));
+} __attribute__((preserve_access_index));
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 7139cf3..bb05af7 100644
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,7 @@ require (
        github.com/sirupsen/logrus v1.9.3
        github.com/spf13/cobra v1.3.0
        github.com/spf13/viper v1.10.1
-       github.com/stretchr/testify v1.8.1
+       github.com/stretchr/testify v1.8.4
        github.com/zekroTJA/timedmap v1.4.0
        golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
        golang.org/x/net v0.32.0
@@ -46,7 +46,7 @@ require (
        github.com/json-iterator/go v1.1.12 // indirect
        github.com/magiconair/properties v1.8.5 // indirect
        github.com/mdlayher/netlink v1.7.2 // indirect
-       github.com/mdlayher/socket v0.4.1 // indirect
+       github.com/mdlayher/socket v0.5.1 // indirect
        github.com/mitchellh/mapstructure v1.4.3 // indirect
        github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
        github.com/modern-go/reflect2 v1.0.2 // indirect
diff --git a/go.sum b/go.sum
index 1cc3adf..e15c3cc 100644
--- a/go.sum
+++ b/go.sum
@@ -369,8 +369,8 @@ github.com/mdlayher/netlink v1.7.2/go.mod 
h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU
 github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod 
h1:GAFlyu4/XV68LkQKYzKhIo/WW7j3Zi0YRAz/BOoanUc=
 github.com/mdlayher/socket v0.0.0-20211007213009-516dcbdf0267/go.mod 
h1:nFZ1EtZYK8Gi/k6QNu7z7CgO20i/4ExeQswwWuPmG/g=
 github.com/mdlayher/socket v0.1.0/go.mod 
h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs=
-github.com/mdlayher/socket v0.4.1 
h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
-github.com/mdlayher/socket v0.4.1/go.mod 
h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
+github.com/mdlayher/socket v0.5.1 
h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos=
+github.com/mdlayher/socket v0.5.1/go.mod 
h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
 github.com/miekg/dns v1.0.14/go.mod 
h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
 github.com/miekg/dns v1.1.26/go.mod 
h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
 github.com/miekg/dns v1.1.41/go.mod 
h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
@@ -467,7 +467,6 @@ github.com/spf13/viper v1.10.1/go.mod 
h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8q
 github.com/stoewer/go-strcase v1.2.0/go.mod 
h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
 github.com/stretchr/objx v0.5.0/go.mod 
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -476,10 +475,8 @@ github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
 github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.6.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.1 
h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
-github.com/stretchr/testify v1.8.1/go.mod 
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4 
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod 
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/subosito/gotenv v1.2.0 
h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod 
h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tklauser/go-sysconf v0.3.9 
h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index 4fc6823..b7930e2 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -33,6 +33,8 @@ import (
        "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/module"
+       "github.com/apache/skywalking-rover/pkg/process"
+       "github.com/apache/skywalking-rover/pkg/process/api"
        "github.com/apache/skywalking-rover/pkg/tools"
        "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
@@ -56,7 +58,7 @@ func NewConnectionCollector() *ConnectCollector {
        return &ConnectCollector{}
 }
 
-func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext) error {
+func (c *ConnectCollector) Start(m *module.Manager, ctx 
*common.AccessLogContext) error {
        perCPUBufferSize, err := 
units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
        if err != nil {
                return err
@@ -73,13 +75,9 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
                return fmt.Errorf("the queue size be small than 1")
        }
-       track, err := ip.NewConnTrack()
-       if err != nil {
-               connectionLogger.Warnf("cannot create the connection tracker, 
%v", err)
-       }
        c.eventQueue = btf.NewEventQueue("connection resolver", 
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
                ctx.Config.ConnectionAnalyze.QueueSize, func(num int) 
btf.PartitionContext {
-                       return newConnectionPartitionContext(ctx, track)
+                       return NewConnectionPartitionContext(ctx, 
m.FindModule(process.ModuleName).(process.K8sOperator))
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
                ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
@@ -130,18 +128,18 @@ func (c *ConnectCollector) Stop() {
 
 type ConnectionPartitionContext struct {
        context     *common.AccessLogContext
-       connTracker *ip.ConnTrack
+       k8sOperator process.K8sOperator
 }
 
-func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker 
*ip.ConnTrack) *ConnectionPartitionContext {
+func NewConnectionPartitionContext(ctx *common.AccessLogContext,
+       k8sOperator process.K8sOperator) *ConnectionPartitionContext {
        return &ConnectionPartitionContext{
                context:     ctx,
-               connTracker: connTracker,
+               k8sOperator: k8sOperator,
        }
 }
 
 func (c *ConnectionPartitionContext) Start(ctx context.Context) {
-
 }
 
 func (c *ConnectionPartitionContext) Consume(data interface{}) {
@@ -151,7 +149,7 @@ func (c *ConnectionPartitionContext) Consume(data 
interface{}) {
                        "pid: %d, fd: %d, role: %s: func: %s, family: %d, 
success: %d, conntrack exist: %t",
                        event.ConID, event.RandomID, event.PID, event.SocketFD, 
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
                        event.SocketFamily, event.ConnectSuccess, 
event.ConnTrackUpstreamPort != 0)
-               socketPair := c.buildSocketFromConnectEvent(event)
+               socketPair := c.BuildSocketFromConnectEvent(event)
                if socketPair == nil {
                        connectionLogger.Debugf("cannot found the socket paire 
from connect event, connection ID: %d, randomID: %d",
                                event.ConID, event.RandomID)
@@ -159,7 +157,6 @@ func (c *ConnectionPartitionContext) Consume(data 
interface{}) {
                }
                connectionLogger.Debugf("build socket pair success, connection 
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
                        event.ConID, event.RandomID, socketPair.Role, 
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
-               c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
                forwarder.SendConnectEvent(c.context, event, socketPair)
        case *events.SocketCloseEvent:
                connectionLogger.Debugf("receive close event, connection ID: 
%d, randomID: %d, pid: %d, fd: %d",
@@ -169,7 +166,7 @@ func (c *ConnectionPartitionContext) Consume(data 
interface{}) {
        }
 }
 
-func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event 
*events.SocketConnectEvent, result *ip.SocketPair) {
+func (c *ConnectionPartitionContext) FixSocketFamilyIfNeed(event 
*events.SocketConnectEvent, result *ip.SocketPair) {
        if result == nil {
                return
        }
@@ -189,39 +186,39 @@ func (c *ConnectionPartitionContext) 
fixSocketFamilyIfNeed(event *events.SocketC
        }
 }
 
-func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event 
*events.SocketConnectEvent) *ip.SocketPair {
+func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event 
*events.SocketConnectEvent) *ip.SocketPair {
        if event.SocketFamily != unix.AF_INET && event.SocketFamily != 
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
                // if not ipv4, ipv6 or unknown, ignore
                return nil
        }
-       socketPair := c.buildSocketPair(event)
-       if socketPair != nil && socketPair.IsValid() {
+       pair := c.BuildSocketPair(event)
+       if pair != nil && pair.IsValid() {
                connectionLogger.Debugf("found the connection from the connect 
event is valid, connection ID: %d, randomID: %d",
                        event.ConID, event.RandomID)
-               return socketPair
+               return pair
        }
        // if only the local port not success, maybe the upstream port is not 
open, so it could be continued
-       if c.isOnlyLocalPortEmpty(socketPair) {
+       if c.IsOnlyLocalPortEmpty(pair) {
                event.ConnectSuccess = 0
                connectionLogger.Debugf("the connection from the connect event 
is only the local port is empty, connection ID: %d, randomID: %d",
                        event.ConID, event.RandomID)
-               return socketPair
+               return pair
        }
 
        pair, err := ip.ParseSocket(event.PID, event.SocketFD)
        if err != nil {
-               connectionLogger.Debugf("cannot found the socket, pid: %d, 
socket FD: %d", event.PID, event.SocketFD)
+               connectionLogger.Debugf("cannot found the socket, pid: %d, 
socket FD: %d, error: %v", event.PID, event.SocketFD, err)
                return nil
        }
        connectionLogger.Debugf("found the connection from the socket, 
connection ID: %d, randomID: %d",
                event.ConID, event.RandomID)
        pair.Role = enums.ConnectionRole(event.Role)
-       c.fixSocketFamilyIfNeed(event, pair)
-       c.tryToUpdateSocketFromConntrack(event, pair)
+       c.FixSocketFamilyIfNeed(event, pair)
+       c.CheckNeedConntrack(event, pair)
        return pair
 }
 
-func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair 
*ip.SocketPair) bool {
+func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair 
*ip.SocketPair) bool {
        if socketPair == nil {
                return false
        }
@@ -233,7 +230,7 @@ func (c *ConnectionPartitionContext) 
isOnlyLocalPortEmpty(socketPair *ip.SocketP
        return socketPair.IsValid()
 }
 
-func (c *ConnectionPartitionContext) buildSocketPair(event 
*events.SocketConnectEvent) *ip.SocketPair {
+func (c *ConnectionPartitionContext) BuildSocketPair(event 
*events.SocketConnectEvent) *ip.SocketPair {
        var result *ip.SocketPair
        haveConnTrack := false
        if event.SocketFamily == unix.AF_INET {
@@ -288,22 +285,28 @@ func (c *ConnectionPartitionContext) 
buildSocketPair(event *events.SocketConnect
                return result
        }
 
-       c.fixSocketFamilyIfNeed(event, result)
-       c.tryToUpdateSocketFromConntrack(event, result)
+       c.FixSocketFamilyIfNeed(event, result)
+       c.CheckNeedConntrack(event, result)
        return result
 }
 
-func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) {
-       if socket != nil && socket.IsValid() && c.connTracker != nil && 
!tools.IsLocalHostAddress(socket.DestIP) &&
-               event.FuncName != enums.SocketFunctionNameAccept { // accept 
event don't need to update the remote address
-               // if no contract and socket data is valid, then trying to get 
the remote address from the socket
-               // to encase the remote address is not the real remote address
-               originalIP := socket.DestIP
-               originalPort := socket.DestPort
-               if c.connTracker.UpdateRealPeerAddress(socket) {
-                       connectionLogger.Debugf("update the socket address from 
conntrack success, "+
-                               "connection ID: %d, randomID: %d, original 
remote: %s:%d, new remote: %s:%d",
-                               event.ConID, event.RandomID, originalIP, 
originalPort, socket.DestIP, socket.DestPort)
-               }
+func (c *ConnectionPartitionContext) CheckNeedConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) {
+       if socket == nil || !socket.IsValid() || 
tools.IsLocalHostAddress(socket.DestIP) ||
+               event.FuncName == enums.SocketFunctionNameAccept || // accept 
event don't need to update the remote address
+               !c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, 
api.Kubernetes) { // only the k8s process need to update the remote address 
from conntrack
+               return
+       }
+
+       isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
+       if err != nil {
+               connectionLogger.Warnf("cannot found the pod IP, connection ID: 
%d, randomID: %d, error: %v",
+                       event.ConID, event.RandomID, err)
+       }
+       if isPodIP {
+               connectionLogger.Debugf("detect the remote IP is pod IP, 
connection ID: %d, randomID: %d, remote: %s",
+                       event.ConID, event.RandomID, socket.DestIP)
+               return
        }
+       // update to the socket need to update the remote address from conntrack
+       socket.NeedConnTrack = true
 }
diff --git a/pkg/accesslog/collector/protocols/connection.go 
b/pkg/accesslog/collector/protocols/connection.go
index b6a9e24..66d0ce9 100644
--- a/pkg/accesslog/collector/protocols/connection.go
+++ b/pkg/accesslog/collector/protocols/connection.go
@@ -34,7 +34,6 @@ type PartitionConnection struct {
        protocolAnalyzer       map[enums.ConnectionProtocol]Protocol
        protocolMetrics        map[enums.ConnectionProtocol]ProtocolMetrics
        closed                 bool
-       closeCallback          common.ConnectionProcessFinishCallback
        skipAllDataAnalyze     bool
        lastCheckCloseTime     time.Time
 }
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index a4577f7..b336f9d 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -102,7 +102,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
-                       return &events.SocketDataUploadEvent{}
+                       return &events.SocketDataUploadEvent{Buffer: 
*buffer.BorrowNewBuffer()}
                }, func(data interface{}) int {
                        return 
int(data.(*events.SocketDataUploadEvent).ConnectionID)
                })
@@ -166,20 +166,6 @@ func NewPartitionContext(ctx *common.AccessLogContext, num 
int, protocols []Prot
        return pc
 }
 
-func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent, 
closeCallback common.ConnectionProcessFinishCallback) {
-       conn, exist := 
p.connections.Get(p.buildConnectionKey(event.GetConnectionID(), 
event.GetRandomID()))
-       if !exist {
-               log.Debugf("connection is not exist in the partion context, 
connection ID: %d, random ID: %d, partition number: %d",
-                       event.GetConnectionID(), event.GetRandomID(), 
p.partitionNum)
-               closeCallback()
-               return
-       }
-       connection := conn.(*PartitionConnection)
-       connection.closeCallback = closeCallback
-       log.Debugf("receive the connection close event and mark is closable, 
connection ID: %d, random ID: %d, partition number: %d",
-               event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
-}
-
 func (p *PartitionContext) Start(ctx context.Context) {
        // process events with interval
        flushDuration, _ := time.ParseDuration(p.context.Config.Flush.Period)
@@ -189,7 +175,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
                        select {
                        case <-timeTicker.C:
                                // process event with interval
-                               p.processEvents()
+                               p.ProcessEvents()
                        case <-ctx.Done():
                                timeTicker.Stop()
                                return
@@ -203,7 +189,7 @@ func (p *PartitionContext) Start(ctx context.Context) {
                for {
                        select {
                        case <-expireTicker.C:
-                               p.processExpireEvents()
+                               p.ProcessExpireEvents()
                        case <-ctx.Done():
                                expireTicker.Stop()
                                return
@@ -225,19 +211,19 @@ func (p *PartitionContext) Consume(data interface{}) {
                        forwarder.SendTransferNoProtocolEvent(p.context, event)
                        return
                }
-               connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol(), event.DataID())
+               connection := p.GetConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol(), event.DataID())
                connection.AppendDetail(p.context, event)
        case *events.SocketDataUploadEvent:
                pid, _ := events.ParseConnectionID(event.ConnectionID)
                log.Debugf("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, prev data id: %d, "+
                        "data id: %d, sequence: %d, protocol: %d",
                        event.ConnectionID, event.RandomID, pid, 
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
-               connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
+               connection := p.GetConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
                connection.AppendData(event)
        }
 }
 
-func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
+func (p *PartitionContext) GetConnectionContext(connectionID, randomID uint64,
        protocol enums.ConnectionProtocol, currentDataID uint64) 
*PartitionConnection {
        conKey := p.buildConnectionKey(connectionID, randomID)
        conn, exist := p.connections.Get(conKey)
@@ -259,7 +245,7 @@ func (p *PartitionContext) buildConnectionKey(conID, ranID 
uint64) string {
        return string(buf)
 }
 
-func (p *PartitionContext) processEvents() {
+func (p *PartitionContext) ProcessEvents() {
        // it could be triggered by interval or reach counter
        // if any trigger bean locked, the other one just ignore process
        if !p.analyzeLocker.TryLock() {
@@ -284,9 +270,6 @@ func (p *PartitionContext) processEvents() {
                        p.checkTheConnectionIsAlreadyClose(info)
                }
                if info.closed {
-                       if info.closeCallback != nil {
-                               info.closeCallback()
-                       }
                        closedConnections = append(closedConnections, conKey)
                        log.Debugf("detect the connection is already closed, 
then notify to the callback, connection ID: %d, random ID: %d, partition 
number: %d",
                                info.connectionID, info.randomID, 
p.partitionNum)
@@ -320,7 +303,7 @@ func (p *PartitionContext) 
checkTheConnectionIsAlreadyClose(con *PartitionConnec
        }
 }
 
-func (p *PartitionContext) processExpireEvents() {
+func (p *PartitionContext) ProcessExpireEvents() {
        // the expiry must be mutual exclusion with events processor
        p.analyzeLocker.Lock()
        defer p.analyzeLocker.Unlock()
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index afac079..1c0a86e 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -42,16 +42,11 @@ import (
 
        cmap "github.com/orcaman/concurrent-map"
 
-       "k8s.io/apimachinery/pkg/util/cache"
-
        v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
 const (
-       // only using to match the remote IP address
-       localAddressPairCacheTime = time.Second * 15
-
        // clean the active connection in BPF interval
        cleanActiveConnectionInterval = time.Second * 20
 
@@ -62,19 +57,6 @@ const (
        connectionCheckExistTime = time.Second * 30
 )
 
-type addressProcessType int
-
-const (
-       addressProcessTypeUnknown addressProcessType = iota
-       addressProcessTypeLocal
-       addressProcessTypeKubernetes
-)
-
-const (
-       strLocal  = "local"
-       strRemote = "remote"
-)
-
 type ConnectEventWithSocket struct {
        *events.SocketConnectEvent
        SocketPair *ip.SocketPair
@@ -82,13 +64,11 @@ type ConnectEventWithSocket struct {
 
 type CloseEventWithNotify struct {
        *events.SocketCloseEvent
-       allProcessorFinished bool
 }
 
 type ConnectionProcessFinishCallback func()
 
 type ConnectionProcessor interface {
-       OnConnectionClose(event *events.SocketCloseEvent, callback 
ConnectionProcessFinishCallback)
 }
 
 type FlusherListener interface {
@@ -105,10 +85,6 @@ type ConnectionManager struct {
        moduleMgr   *module.Manager
        processOP   process.Operator
        connections cmap.ConcurrentMap
-       // addressWithPid cache all local ip+port and pid mapping for match the 
process on the same host
-       // such as service mesh(process with envoy)
-       addressWithPid   *cache.Expiring
-       localPortWithPid *cache.Expiring // in some case, we can only get the 
127.0.0.1 from server side, so we only cache the port for this
        // localIPWithPid cache all local monitoring process bind IP address
        // for checking the remote address is local or not
        localIPWithPid map[string]int32
@@ -124,10 +100,9 @@ type ConnectionManager struct {
        processors       []ConnectionProcessor
        processListeners []ProcessListener
 
-       // connection already close but the connection (protocols)log not build 
finished
-       allUnfinishedConnections map[string]*bool
-
        flushListeners []FlusherListener
+
+       connectTracker *ip.ConnTrack
 }
 
 func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -142,10 +117,6 @@ func (c *ConnectionManager) 
RegisterNewFlushListener(listener FlusherListener) {
        c.flushListeners = append(c.flushListeners, listener)
 }
 
-type addressInfo struct {
-       pid uint32
-}
-
 type ConnectionInfo struct {
        ConnectionID       uint64
        RandomID           uint64
@@ -158,19 +129,21 @@ type ConnectionInfo struct {
 }
 
 func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
+       track, err := ip.NewConnTrack()
+       if err != nil {
+               log.Warnf("cannot create the connection tracker, %v", err)
+       }
        mgr := &ConnectionManager{
-               moduleMgr:                moduleMgr,
-               processOP:                
moduleMgr.FindModule(process.ModuleName).(process.Operator),
-               connections:              cmap.New(),
-               addressWithPid:           cache.NewExpiring(),
-               localPortWithPid:         cache.NewExpiring(),
-               localIPWithPid:           make(map[string]int32),
-               monitoringProcesses:      
make(map[int32][]api.ProcessInterface),
-               processMonitorMap:        bpfLoader.ProcessMonitorControl,
-               activeConnectionMap:      bpfLoader.ActiveConnectionMap,
-               allUnfinishedConnections: make(map[string]*bool),
-               monitorFilter:            filter,
-               flushListeners:           make([]FlusherListener, 0),
+               moduleMgr:           moduleMgr,
+               processOP:           
moduleMgr.FindModule(process.ModuleName).(process.Operator),
+               connections:         cmap.New(),
+               localIPWithPid:      make(map[string]int32),
+               monitoringProcesses: make(map[int32][]api.ProcessInterface),
+               processMonitorMap:   bpfLoader.ProcessMonitorControl,
+               activeConnectionMap: bpfLoader.ActiveConnectionMap,
+               monitorFilter:       filter,
+               flushListeners:      make([]FlusherListener, 0),
+               connectTracker:      track,
        }
        return mgr
 }
@@ -276,39 +249,26 @@ func (c *ConnectionManager) Find(event events.Event) 
*ConnectionInfo {
 }
 
 func (c *ConnectionManager) buildRemoteAddress(e *events.SocketConnectEvent, 
socket *ip.SocketPair) *v3.ConnectionAddress {
-       tp := c.isLocalTarget(socket)
-       if tp == addressProcessTypeUnknown {
-               log.Debugf("building the remote address to unknown, connection: 
%d-%d, role: %s, local: %s:%d, remote: %s:%d",
-                       e.GetConnectionID(), e.GetRandomID(), socket.Role, 
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
-               return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
+       // if the remote address is local, then no needs to build the 
address(access log no need to send by communicate with self)
+       if tools.IsLocalHostAddress(socket.DestIP) {
+               return nil
        }
 
-       var addrInfo *addressInfo
-       var fromType string
-       switch socket.Role {
-       case enums.ConnectionRoleClient:
-               addrInfo = c.getAddressPid(socket.SrcIP, socket.SrcPort, false)
-               fromType = strLocal
-       case enums.ConnectionRoleServer:
-               addrInfo = c.getAddressPid(socket.DestIP, socket.DestPort, true)
-               fromType = strRemote
-       }
-
-       if addrInfo != nil {
-               log.Debugf("building the remote address from %s process, pid: 
%d, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d",
-                       fromType, addrInfo.pid, e.GetConnectionID(), 
e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, 
socket.DestPort)
-               return c.buildLocalAddress(addrInfo.pid, socket.DestPort, 
socket)
-       } else if tp == addressProcessTypeKubernetes {
-               if p := c.localIPWithPid[socket.DestIP]; p != 0 {
-                       log.Debugf("building the remote address from kubernetes 
process, connection: %d-%d, role: %s, pid: %d, local: %s:%d, remote: %s:%d",
-                               e.GetConnectionID(), e.GetRandomID(), 
socket.Role, p, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
-                       return c.buildLocalAddress(uint32(p), socket.DestPort, 
socket)
+       // if the remote connection is need to use conntrack, then update the 
real peer address
+       if socket.NeedConnTrack {
+               if err := c.connectTracker.UpdateRealPeerAddress(socket); err 
!= nil {
+                       log.Debugf("cannot update the real peer address, %v", 
err)
                }
        }
 
-       log.Debugf("cannot found the peer pid for the connection: %d-%d, remote 
type: %v, role: %s, local: %s:%d, remote: %s:%d",
-               e.GetConnectionID(), e.GetRandomID(), tp, socket.Role, 
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
-       return nil
+       // found local address with pid
+       if pid, exist := c.localIPWithPid[socket.DestIP]; exist && pid != 0 {
+               return c.buildLocalAddress(uint32(pid), socket.DestPort, socket)
+       }
+
+       log.Debugf("building the remote address to unknown, connection: %d-%d, 
role: %s, local: %s:%d, remote: %s:%d",
+               e.GetConnectionID(), e.GetRandomID(), socket.Role, 
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
+       return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
 }
 
 func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, 
event events.Event) {
@@ -317,12 +277,7 @@ func (c *ConnectionManager) 
connectionPostHandle(connection *ConnectionInfo, eve
        }
        switch e := event.(type) {
        case *CloseEventWithNotify:
-               if e.allProcessorFinished {
-                       connection.MarkDeletable = true
-               } else {
-                       // if not all processor finished, then add into the map
-                       c.allUnfinishedConnections[fmt.Sprintf("%d_%d", 
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
-               }
+               connection.MarkDeletable = true
        case events.SocketDetail:
                tlsMode := connection.RPCConnection.TlsMode
                protocol := connection.RPCConnection.Protocol
@@ -368,6 +323,17 @@ func (c *ConnectionManager) ProcessIsMonitor(pid uint32) 
bool {
        return len(c.monitoringProcesses[int32(pid)]) > 0
 }
 
+func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, detectType 
api.ProcessDetectType) bool {
+       c.monitoringProcessLock.RLock()
+       defer c.monitoringProcessLock.RUnlock()
+       for _, p := range c.monitoringProcesses[int32(pid)] {
+               if p.DetectType() == detectType {
+                       return true
+               }
+       }
+       return false
+}
+
 func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, 
socket *ip.SocketPair,
        local, remote *v3.ConnectionAddress) *ConnectionInfo {
        var role v32.DetectPoint
@@ -436,85 +402,9 @@ func (c *ConnectionManager) buildAddressFromRemote(ipHost 
string, port uint16) *
 }
 
 func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent) 
*CloseEventWithNotify {
-       result := &CloseEventWithNotify{
-               SocketCloseEvent:     event,
-               allProcessorFinished: false,
-       }
-       processCount := len(c.processors)
-       for _, l := range c.processors {
-               l.OnConnectionClose(event, func() {
-                       processCount--
-                       if processCount > 0 {
-                               return
-                       }
-                       result.allProcessorFinished = true
-               })
+       return &CloseEventWithNotify{
+               SocketCloseEvent: event,
        }
-       return result
-}
-
-func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, 
localPid bool, pid uint32) {
-       localAddrInfo := &addressInfo{
-               pid: pid,
-       }
-       c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port, 
localPid), localAddrInfo, localAddressPairCacheTime)
-       localStr := strRemote
-       if localPid {
-               localStr = strLocal
-       }
-       log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: 
%d", localStr, hostAddress, port, pid)
-}
-
-func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16, 
localPid bool) *addressInfo {
-       addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", 
hostAddress, port, localPid))
-       if ok && addrInfo != nil {
-               return addrInfo.(*addressInfo)
-       }
-       return nil
-}
-
-func (c *ConnectionManager) OnConnectEvent(event *events.SocketConnectEvent, 
pair *ip.SocketPair) {
-       // only adding the local ip port when remote is local address
-       switch c.isLocalTarget(pair) {
-       case addressProcessTypeUnknown:
-               log.Debugf("the target address is not local, so no needs to 
save the cache. "+
-                       "address: %s:%d, pid: %d", pair.DestIP, pair.DestPort, 
event.PID)
-       case addressProcessTypeLocal:
-               switch pair.Role {
-               case enums.ConnectionRoleClient:
-                       // if current is client, so the local port should be 
unique
-                       c.savingTheAddress(pair.SrcIP, pair.SrcPort, true, 
event.PID)
-               case enums.ConnectionRoleServer:
-                       // if current is server, so the remote port should be 
unique
-                       c.savingTheAddress(pair.DestIP, pair.DestPort, false, 
event.PID)
-               case enums.ConnectionRoleUnknown:
-                       log.Debugf("the target address local but unknown role, 
so no needs to save the cache. socket: [%s], pid: %d",
-                               pair, event.PID)
-               }
-       case addressProcessTypeKubernetes:
-               switch pair.Role {
-               case enums.ConnectionRoleClient:
-                       // if current is client, so the local port should be 
unique
-                       c.savingTheAddress(pair.SrcIP, pair.SrcPort, true, 
event.PID)
-               case enums.ConnectionRoleServer:
-                       // if current is server, so the remote port should be 
unique
-                       c.savingTheAddress(pair.DestIP, pair.DestPort, false, 
event.PID)
-               case enums.ConnectionRoleUnknown:
-                       log.Debugf("the target address kubernetes but unknown 
role, so no needs to save the cache. socket: [%s], pid: %d",
-                               pair, event.PID)
-               }
-       }
-}
-
-func (c *ConnectionManager) isLocalTarget(pair *ip.SocketPair) 
addressProcessType {
-       destIP := pair.DestIP
-       if tools.IsLocalHostAddress(destIP) {
-               return addressProcessTypeLocal
-       }
-       if _, exist := c.localIPWithPid[destIP]; exist {
-               return addressProcessTypeKubernetes
-       }
-       return addressProcessTypeUnknown
 }
 
 func (c *ConnectionManager) AddNewProcess(pid int32, entities 
[]api.ProcessInterface) {
@@ -706,20 +596,6 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() 
{
                }
        })
 
-       deleteFromUnfinished := make([]string, 0)
-       for conKey, processorFinished := range c.allUnfinishedConnections {
-               if *processorFinished {
-                       deletableConnections[conKey] = true
-                       deleteFromUnfinished = append(deleteFromUnfinished, 
conKey)
-               } else {
-                       // if the processor not finished, then ignore it from 
deletable connections
-                       delete(deletableConnections, conKey)
-               }
-       }
-       for _, key := range deleteFromUnfinished {
-               delete(c.allUnfinishedConnections, key)
-       }
-
        for key := range deletableConnections {
                log.Debugf("deleting the connection in manager: %s", key)
                c.connections.Remove(key)
diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go
index 0d60012..34baee3 100644
--- a/pkg/accesslog/common/filter.go
+++ b/pkg/accesslog/common/filter.go
@@ -18,6 +18,7 @@
 package common
 
 import (
+       "os"
        "strings"
 
        "github.com/apache/skywalking-rover/pkg/process/api"
@@ -46,7 +47,11 @@ func NewStaticMonitorFilter(namespaces, clusters []string) 
*StaticMonitorFilter
 }
 
 func (s *StaticMonitorFilter) ShouldIncludeProcesses(processes 
[]api.ProcessInterface) (res []api.ProcessInterface) {
+       var selfPid = os.Getpid()
        for _, entity := range processes {
+               if int(entity.Pid()) == selfPid {
+                       continue
+               }
                if entity.DetectType() != api.Kubernetes { // for now, we only 
have the kubernetes detected processes
                        continue
                }
diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go
index acd0d61..d4aaa1b 100644
--- a/pkg/accesslog/events/close.go
+++ b/pkg/accesslog/events/close.go
@@ -20,7 +20,7 @@ package events
 import (
        "time"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/host"
 )
 
@@ -35,7 +35,7 @@ type SocketCloseEvent struct {
        Success   uint32
 }
 
-func (c *SocketCloseEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketCloseEvent) ReadFrom(r btf.Reader) {
        c.ConnectionID = r.ReadUint64()
        c.RandomID = r.ReadUint64()
        c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/connect.go b/pkg/accesslog/events/connect.go
index 9433168..daf6458 100644
--- a/pkg/accesslog/events/connect.go
+++ b/pkg/accesslog/events/connect.go
@@ -20,7 +20,7 @@ package events
 import (
        "time"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/host"
 )
 
@@ -47,7 +47,7 @@ type SocketConnectEvent struct {
        ConnTrackUpstreamPort uint32
 }
 
-func (c *SocketConnectEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketConnectEvent) ReadFrom(r btf.Reader) {
        c.ConID = r.ReadUint64()
        c.RandomID = r.ReadUint64()
        c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 6ecfd2f..85bbd76 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -20,7 +20,7 @@ package events
 import (
        "fmt"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
 )
 
@@ -41,7 +41,11 @@ type SocketDataUploadEvent struct {
        Buffer       [2048]byte
 }
 
-func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
+func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte {
+       return &s.Buffer
+}
+
+func (s *SocketDataUploadEvent) ReadFrom(r btf.Reader) {
        s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8())
        s.HaveReduce = r.ReadUint8()
        s.Direction0 = enums.SocketDataDirection(r.ReadUint8())
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index a5ac46c..5beb5df 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -20,7 +20,7 @@ package events
 import (
        "time"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/buffer"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
        "github.com/apache/skywalking-rover/pkg/tools/host"
@@ -73,7 +73,7 @@ type SocketDetailEvent struct {
        SSL                           uint8
 }
 
-func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
+func (d *SocketDetailEvent) ReadFrom(r btf.Reader) {
        d.ConnectionID = r.ReadUint64()
        d.RandomID = r.ReadUint64()
        d.DataID0 = r.ReadUint64()
diff --git a/pkg/accesslog/events/events_test.go 
b/pkg/accesslog/events/events_test.go
index 1da9f3e..f2c11bd 100644
--- a/pkg/accesslog/events/events_test.go
+++ b/pkg/accesslog/events/events_test.go
@@ -25,16 +25,16 @@ import (
        "strings"
        "testing"
 
-       "github.com/stretchr/testify/assert"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+       "github.com/stretchr/testify/assert"
 )
 
 // nolint
 func TestBufferRead(t *testing.T) {
        tests := []struct {
                hex    string
-               create func() reader.EventReader
+               create func() btf.EventReader
        }{
                {
                        hex: `
@@ -46,7 +46,7 @@ func TestBufferRead(t *testing.T) {
 00 00 00 00 00 00 00 00 00 00 ff ff 7f 00 00 01
 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00 00 00 00 00`,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketConnectEvent{}
                        },
                },
@@ -59,7 +59,7 @@ func TestBufferRead(t *testing.T) {
 23 4a 01 00 bb 49 01 00 00 00 00 00 e4 01 00 00
 24 21 00 00 01 00 00 00 39 d6 00 00 00 00 00 00
 03 00 00 00 00 00 00 00 02 02 00 02 02 09 01 00`,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketDetailEvent{}
                        },
                },
@@ -69,7 +69,7 @@ func TestBufferRead(t *testing.T) {
 b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00
 7a a1 00 00 04 00 00 00 00 00 00 00 00 00 00 00
 00 00 00 00`,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketCloseEvent{}
                        },
                },
@@ -209,7 +209,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
 00 00 00 00
 `,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketDataUploadEvent{}
                        },
                },
@@ -226,7 +226,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
                        }
                        binaryRead := test.create()
                        selfRead := test.create()
-                       bufReader := reader.NewReader(rawData)
+                       bufReader := btf.NewReader(rawData)
                        selfRead.ReadFrom(bufReader)
                        if err := bufReader.HasError(); err != nil {
                                t.Fatalf("reading by self parsing error: %v", 
err)
diff --git a/pkg/accesslog/events/ztunnel.go b/pkg/accesslog/events/ztunnel.go
index abd2f1c..81572c3 100644
--- a/pkg/accesslog/events/ztunnel.go
+++ b/pkg/accesslog/events/ztunnel.go
@@ -17,7 +17,9 @@
 
 package events
 
-import "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
+)
 
 type ZTunnelSocketMappingEvent struct {
        OriginalSrcIP        uint32
@@ -30,7 +32,7 @@ type ZTunnelSocketMappingEvent struct {
        Pad1                 uint32
 }
 
-func (z *ZTunnelSocketMappingEvent) ReadFrom(r *reader.Reader) {
+func (z *ZTunnelSocketMappingEvent) ReadFrom(r btf.Reader) {
        z.OriginalSrcIP = r.ReadUint32()
        z.OriginalDestIP = r.ReadUint32()
        z.OriginalSrcPort = r.ReadUint16()
diff --git a/pkg/process/api.go b/pkg/process/api.go
index b84453b..ca11907 100644
--- a/pkg/process/api.go
+++ b/pkg/process/api.go
@@ -37,4 +37,6 @@ type Operator interface {
 type K8sOperator interface {
        // NodeName get the node name
        NodeName() string
+       // IsPodIP check the ip is pod ip
+       IsPodIP(ip string) (bool, error)
 }
diff --git a/pkg/process/finders/kubernetes/finder.go 
b/pkg/process/finders/kubernetes/finder.go
index 03a0ba7..58282ea 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -21,10 +21,12 @@ import (
        "bufio"
        "context"
        "fmt"
+       "hash/fnv"
        "os"
        "regexp"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        lru "github.com/hashicorp/golang-lru"
@@ -36,6 +38,10 @@ import (
        v1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
+       "k8s.io/apimachinery/pkg/fields"
+       "k8s.io/apimachinery/pkg/util/cache"
+       "k8s.io/apimachinery/pkg/util/rand"
+
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
 
@@ -53,6 +59,8 @@ var log = logger.GetLogger("process", "finder", "kubernetes")
 var (
        kubepodsRegex      = 
regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`)
        openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`)
+       ipExistTimeout     = time.Minute * 10
+       ipSearchParallel   = 10
 )
 
 type ProcessFinder struct {
@@ -73,6 +81,10 @@ type ProcessFinder struct {
 
        // runtime config
        namespaces []string
+
+       // for IsPodIP check
+       podIPChecker *cache.Expiring
+       podIPMutexes map[int]*sync.Mutex
 }
 
 func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, 
manager base.ProcessManager) error {
@@ -89,11 +101,16 @@ func (f *ProcessFinder) Init(ctx context.Context, conf 
base.FinderBaseConfig, ma
        f.stopChan = make(chan struct{}, 1)
        f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
        f.manager = manager
-       cache, err := lru.New(5000)
+       f.podIPChecker = cache.NewExpiring()
+       f.podIPMutexes = make(map[int]*sync.Mutex)
+       for i := 0; i < ipSearchParallel; i++ {
+               f.podIPMutexes[i] = &sync.Mutex{}
+       }
+       processCache, err := lru.New(5000)
        if err != nil {
                return err
        }
-       f.processCache = cache
+       f.processCache = processCache
 
        return nil
 }
@@ -277,7 +294,7 @@ func (f *ProcessFinder) getProcessCGroup(pid int32) 
([]string, error) {
        }
        defer cgroupFile.Close()
 
-       cache := make(map[string]bool)
+       cgroups := make(map[string]bool)
        scanner := bufio.NewScanner(cgroupFile)
        for scanner.Scan() {
                infos := strings.Split(scanner.Text(), ":")
@@ -295,14 +312,14 @@ func (f *ProcessFinder) getProcessCGroup(pid int32) 
([]string, error) {
                        if openShiftPod := 
openShiftPodsRegex.FindStringSubmatch(path); len(openShiftPod) >= 1 {
                                path = openShiftPod[1]
                        }
-                       cache[path] = true
+                       cgroups[path] = true
                }
        }
-       if len(cache) == 0 {
+       if len(cgroups) == 0 {
                return nil, fmt.Errorf("no cgroups")
        }
        result := make([]string, 0)
-       for k := range cache {
+       for k := range cgroups {
                result = append(result, k)
        }
        return result, nil
@@ -400,3 +417,32 @@ func (f *ProcessFinder) ShouldMonitor(pid int32) bool {
        f.manager.AddDetectedProcess(processes)
        return true
 }
+
+func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
+       val, exist := f.podIPChecker.Get(ip)
+       if exist {
+               return val.(bool), nil
+       }
+
+       // parallels the search
+       h := fnv.New32a()
+       h.Write([]byte(ip))
+       sum32 := int(h.Sum32())
+       mutex := f.podIPMutexes[sum32%ipSearchParallel]
+       mutex.Lock()
+       defer mutex.Unlock()
+
+       pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx, 
metav1.ListOptions{
+               FieldSelector: fields.OneTermEqualSelector("status.podIP", 
ip).String(),
+               Limit:         1,
+       })
+       if err != nil {
+               return false, err
+       }
+       found := len(pods.Items) > 0
+
+       // the timeout added a random value to avoid the cache avalanche
+       addedTime := time.Second * time.Duration(rand.IntnRange(10, 60))
+       f.podIPChecker.Set(ip, found, ipExistTimeout+addedTime)
+       return found, nil
+}
diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go
index dbe99c0..a252416 100644
--- a/pkg/process/finders/manager.go
+++ b/pkg/process/finders/manager.go
@@ -113,6 +113,15 @@ func (m *ProcessManager) Shutdown() error {
        return result
 }
 
+func (m *ProcessManager) Finder(finderType api.ProcessDetectType) 
(base.ProcessFinder, bool) {
+       for _, finder := range m.finders {
+               if finder.DetectType() == finderType {
+                       return finder, true
+               }
+       }
+       return nil, false
+}
+
 func (p *ProcessManagerWithFinder) GetModuleManager() *module.Manager {
        return p.moduleManager
 }
diff --git a/pkg/process/module.go b/pkg/process/module.go
index 648b8f6..95063b5 100644
--- a/pkg/process/module.go
+++ b/pkg/process/module.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/skywalking-rover/pkg/module"
        "github.com/apache/skywalking-rover/pkg/process/api"
        "github.com/apache/skywalking-rover/pkg/process/finders"
+       "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
 )
 
 const ModuleName = "process_discovery"
@@ -105,3 +106,11 @@ func (m *Module) ShouldMonitor(pid int32) bool {
 func (m *Module) NodeName() string {
        return m.config.Kubernetes.NodeName
 }
+
+func (m *Module) IsPodIP(ip string) (bool, error) {
+       k8sFinder, exist := m.manager.Finder(api.Kubernetes)
+       if !exist {
+               return false, nil
+       }
+       return k8sFinder.(*kubernetes.ProcessFinder).IsPodIP(ip)
+}
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index bc451fc..3c59378 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -40,6 +40,10 @@ type SocketDataUploadEvent struct {
        Buffer       [2048]byte
 }
 
+func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte {
+       return &s.Buffer
+}
+
 func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
        return s.Protocol0
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1c01084..359e466 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -20,6 +20,8 @@ package layer7
 import (
        "context"
 
+       "github.com/apache/skywalking-rover/pkg/tools/buffer"
+
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        analyzeBase 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
@@ -37,7 +39,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize 
int, config *profili
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) 
{
        // socket buffer data
        l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
-               return &analyzeBase.SocketDataUploadEvent{}
+               return &analyzeBase.SocketDataUploadEvent{Buffer: 
*buffer.BorrowNewBuffer()}
        }, func(data interface{}) int {
                return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
        })
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..3abb43b 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -30,7 +30,6 @@ import (
        "golang.org/x/arch/arm64/arm64asm"
        "golang.org/x/arch/x86/x86asm"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/elf"
        "github.com/apache/skywalking-rover/pkg/tools/process"
 
@@ -167,7 +166,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader 
RingBufferReader, data
 
 func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
        parallels int, dataSupplier func() interface{}) {
-       rd, err := newQueueReader(emap, perCPUBuffer)
+       rd, err := perf.NewReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
                return
@@ -178,42 +177,53 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap 
*ebpf.Map, bufReader RingBuff
        }
        m.closers = append(m.closers, rd)
 
+       recordBuilder := newPerfRecordBuilder(dataSupplier())
        for i := 0; i < parallels; i++ {
-               m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+               m.asyncReadEvent(rd, emap, recordBuilder, dataSupplier, 
bufReader)
        }
 }
 
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, recordPool 
*perfRecordBuilder,
+       dataSupplier func() interface{}, bufReader RingBufferReader) {
        go func() {
                for {
-                       sample, err := rd.Read()
+                       record := recordPool.GetRecord()
+                       err := rd.ReadInto(record)
                        if err != nil {
+                               recordPool.PutRecord(record)
                                if errors.Is(err, perf.ErrClosed) {
                                        return
                                }
                                log.Warnf("read from %s ringbuffer error: %v", 
emap.String(), err)
                                continue
                        }
-                       if len(sample) == 0 {
+
+                       if record.LostSamples != 0 {
+                               log.Warnf("perf event queue(%s) full, dropped 
%d samples", emap.String(), record.LostSamples)
+                               recordPool.PutRecord(record)
                                continue
                        }
 
                        data := dataSupplier()
-                       if r, ok := data.(reader.EventReader); ok {
-                               sampleReader := reader.NewReader(sample)
+                       if r, ok := data.(EventReader); ok {
+                               sampleReader := NewReader(record.RawSample)
                                r.ReadFrom(sampleReader)
                                if readErr := sampleReader.HasError(); readErr 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                                       recordPool.PutRecord(record)
                                        continue
                                }
                        } else {
-                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                               if err := 
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err 
!= nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                                       recordPool.PutRecord(record)
                                        continue
                                }
                        }
 
                        bufReader(data)
+
+                       recordPool.PutRecord(record)
                }
        }()
 }
@@ -402,3 +412,34 @@ func (m *Linker) Close() error {
        })
        return err
 }
+
+type perfRecordBuilder struct {
+       dataSize int
+       pool     sync.Pool
+}
+
+func newPerfRecordBuilder(data interface{}) *perfRecordBuilder {
+       // added 8 bytes means fix some event not aligned
+       var size = binary.Size(data) + 8
+       if r, ok := data.(EventReader); ok {
+               reader := newSizeCalcReader()
+               r.ReadFrom(reader)
+               size = reader.Size() + 8
+       }
+
+       builder := &perfRecordBuilder{
+               dataSize: size,
+       }
+       builder.pool.New = func() any {
+               return &perf.Record{RawSample: make([]byte, 0, 
builder.dataSize)}
+       }
+       return builder
+}
+
+func (p *perfRecordBuilder) GetRecord() *perf.Record {
+       return p.pool.Get().(*perf.Record)
+}
+
+func (p *perfRecordBuilder) PutRecord(r *perf.Record) {
+       p.pool.Put(r)
+}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index fdd8754..eff29c8 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,90 +19,16 @@ package btf
 
 import (
        "context"
-       "fmt"
        "sync"
        "time"
 
        "github.com/cilium/ebpf"
-       "github.com/cilium/ebpf/perf"
-       "github.com/cilium/ebpf/ringbuf"
 )
 
 // queueChannelReducingCountCheckInterval is the interval to check the queue 
channel reducing count
 // if the reducing count is almost full, then added a warning log
 const queueChannelReducingCountCheckInterval = time.Second * 5
 
-type queueReader interface {
-       Read() ([]byte, error)
-       Close() error
-}
-
-func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
-       switch emap.Type() {
-       case ebpf.RingBuf:
-               return newRingBufReader(emap)
-       case ebpf.PerfEventArray:
-               return newPerfQueueReader(emap, perCPUBuffer)
-       }
-       return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
-}
-
-type perfQueueReader struct {
-       name   string
-       reader *perf.Reader
-}
-
-func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, 
error) {
-       reader, err := perf.NewReader(emap, perCPUBuffer)
-       if err != nil {
-               return nil, err
-       }
-       return &perfQueueReader{reader: reader, name: emap.String()}, nil
-}
-
-func (p *perfQueueReader) Read() ([]byte, error) {
-       read, err := p.reader.Read()
-       if err != nil {
-               return nil, err
-       }
-
-       if read.LostSamples != 0 {
-               log.Warnf("perf event queue(%s) full, dropped %d samples", 
p.name, read.LostSamples)
-               return nil, nil
-       }
-
-       return read.RawSample, nil
-}
-
-func (p *perfQueueReader) Close() error {
-       return p.reader.Close()
-}
-
-type ringBufReader struct {
-       reader *ringbuf.Reader
-       name   string
-}
-
-func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
-       reader, err := ringbuf.NewReader(emap)
-       if err != nil {
-               return nil, err
-       }
-       return &ringBufReader{reader: reader, name: emap.String()}, nil
-}
-
-func (r *ringBufReader) Read() ([]byte, error) {
-       read, err := r.reader.Read()
-       if err != nil {
-               return nil, err
-       }
-       return read.RawSample, nil
-}
-
-func (r *ringBufReader) Close() error {
-       return r.reader.Close()
-}
-
 type PartitionContext interface {
        Start(ctx context.Context)
        Consume(data interface{})
diff --git a/pkg/tools/btf/reader/reader.go b/pkg/tools/btf/reader.go
similarity index 62%
rename from pkg/tools/btf/reader/reader.go
rename to pkg/tools/btf/reader.go
index d2e980e..5cae4d5 100644
--- a/pkg/tools/btf/reader/reader.go
+++ b/pkg/tools/btf/reader.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package reader
+package btf
 
 import (
        "encoding/binary"
@@ -25,11 +25,20 @@ import (
 // EventReader read the sample data by self, instant of binary.Read
 type EventReader interface {
        // ReadFrom read buffer data
-       ReadFrom(reader *Reader)
+       ReadFrom(reader Reader)
 }
 
-// Reader buffer sample reader
-type Reader struct {
+type Reader interface {
+       HasError() error
+       ReadUint64() uint64
+       ReadUint32() uint32
+       ReadUint16() uint16
+       ReadUint8() uint8
+       ReadUint8Array(a []uint8, size int)
+}
+
+// BytesReader buffer sample reader
+type BytesReader struct {
        Sample        []byte
        CurrentOffset int
        sampleLen     int
@@ -37,8 +46,8 @@ type Reader struct {
 }
 
 // NewReader create a reader from BPF buffer
-func NewReader(sample []byte) *Reader {
-       return &Reader{
+func NewReader(sample []byte) Reader {
+       return &BytesReader{
                Sample:        sample,
                CurrentOffset: 0,
                sampleLen:     len(sample),
@@ -46,11 +55,11 @@ func NewReader(sample []byte) *Reader {
 }
 
 // HasError is there have error when reading buffer
-func (r *Reader) HasError() error {
+func (r *BytesReader) HasError() error {
        return r.err
 }
 
-func (r *Reader) ReadUint64() uint64 {
+func (r *BytesReader) ReadUint64() uint64 {
        bytes, err := r.read(8)
        if err != nil {
                return 0
@@ -58,7 +67,7 @@ func (r *Reader) ReadUint64() uint64 {
        return binary.LittleEndian.Uint64(bytes)
 }
 
-func (r *Reader) ReadUint32() uint32 {
+func (r *BytesReader) ReadUint32() uint32 {
        bytes, err := r.read(4)
        if err != nil {
                return 0
@@ -66,7 +75,7 @@ func (r *Reader) ReadUint32() uint32 {
        return binary.LittleEndian.Uint32(bytes)
 }
 
-func (r *Reader) ReadUint16() uint16 {
+func (r *BytesReader) ReadUint16() uint16 {
        bytes, err := r.read(2)
        if err != nil {
                return 0
@@ -74,7 +83,7 @@ func (r *Reader) ReadUint16() uint16 {
        return binary.LittleEndian.Uint16(bytes)
 }
 
-func (r *Reader) ReadUint8() uint8 {
+func (r *BytesReader) ReadUint8() uint8 {
        bytes, err := r.read(1)
        if err != nil {
                return 0
@@ -82,7 +91,7 @@ func (r *Reader) ReadUint8() uint8 {
        return bytes[0]
 }
 
-func (r *Reader) ReadUint8Array(a []uint8, size int) {
+func (r *BytesReader) ReadUint8Array(a []uint8, size int) {
        read, err := r.read(size)
        if err != nil {
                return
@@ -90,7 +99,7 @@ func (r *Reader) ReadUint8Array(a []uint8, size int) {
        copy(a, read)
 }
 
-func (r *Reader) read(size int) ([]byte, error) {
+func (r *BytesReader) read(size int) ([]byte, error) {
        if r.err != nil {
                return nil, r.err
        }
@@ -103,3 +112,43 @@ func (r *Reader) read(size int) ([]byte, error) {
        r.CurrentOffset += size
        return bytes, nil
 }
+
+type sizeCalcReader struct {
+       size int
+}
+
+func newSizeCalcReader() *sizeCalcReader {
+       return &sizeCalcReader{}
+}
+
+func (r *sizeCalcReader) HasError() error {
+       return nil
+}
+
+func (r *sizeCalcReader) ReadUint64() uint64 {
+       r.size += 8
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint32() uint32 {
+       r.size += 4
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint16() uint16 {
+       r.size += 2
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint8() uint8 {
+       r.size++
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint8Array(a []uint8, size int) {
+       r.size += size
+}
+
+func (r *sizeCalcReader) Size() int {
+       return r.size
+}
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 368e1f6..33f6d1c 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -37,8 +37,18 @@ var (
        emptyList      = list.New()
 
        log = logger.GetLogger("tools", "buffer")
+
+       PooledBuffer = sync.Pool{
+               New: func() any {
+                       return &[2048]byte{}
+               },
+       }
 )
 
+func BorrowNewBuffer() *[2048]byte {
+       return PooledBuffer.Get().(*[2048]byte)
+}
+
 type SocketDataBuffer interface {
        // Protocol of the buffer
        Protocol() enums.ConnectionProtocol
@@ -71,6 +81,8 @@ type SocketDataBuffer interface {
        StartTime() uint64
        // EndTime the data end timestamp
        EndTime() uint64
+
+       ReleaseBuffer() *[2048]byte
 }
 
 type SocketDataDetail interface {
@@ -259,60 +271,19 @@ func (r *Buffer) Clean() {
 // nolint
 func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
        dataEvents := list.New()
-       detailEvents := list.New()
-       var firstDetailElement *list.Element
-       var lastBufferDataID = start.DataID()
        for nextElement := start.element; nextElement != end.element; 
nextElement = nextElement.Next() {
                if nextElement == nil || nextElement.Value == nil {
                        break
                }
                currentBuffer := nextElement.Value.(SocketDataBuffer)
-               // found first matches detail event
-               if detailEvents.Len() == 0 || firstDetailElement == nil {
-                       for e := r.detailEvents.Front(); e != nil; e = e.Next() 
{
-                               if e.Value == nil {
-                                       continue
-                               }
-                               if e.Value.(SocketDataDetail).DataID() >= 
currentBuffer.DataID() {
-                                       detailEvents.PushBack(e.Value)
-                                       firstDetailElement = e
-                                       break
-                               }
-                       }
-               }
                dataEvents.PushBack(currentBuffer)
-               lastBufferDataID = currentBuffer.DataID()
        }
        lastBuffer := end.element.Value.(SocketDataBuffer)
        dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer: 
lastBuffer, Size: end.bufIndex})
 
-       // if the first detail element been found, append the details until the 
last buffer data id
-       var lastBufferID = lastBufferDataID
-       if lastBuffer != nil {
-               lastBufferID = lastBuffer.DataID()
-       }
-       if firstDetailElement == nil && r.detailEvents != nil {
-               for e := r.detailEvents.Front(); e != nil; e = e.Next() {
-                       if e.Value != nil && 
e.Value.(SocketDataDetail).DataID() == lastBufferID {
-                               detailEvents.PushBack(e.Value)
-                               break
-                       }
-               }
-       } else if firstDetailElement != nil && 
firstDetailElement.Value.(SocketDataDetail).DataID() != lastBufferID {
-               for tmp := firstDetailElement.Next(); tmp != nil; tmp = 
tmp.Next() {
-                       if tmp.Value == nil {
-                               continue
-                       }
-                       if tmp.Value.(SocketDataDetail).DataID() > lastBufferID 
{
-                               break
-                       }
-                       detailEvents.PushBack(tmp.Value)
-               }
-       }
-
        return &Buffer{
                dataEvents:     dataEvents,
-               detailEvents:   detailEvents,
+               detailEvents:   emptyList,
                validated:      validated,
                head:           &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
                current:        &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
@@ -347,6 +318,9 @@ func (r *Buffer) BuildDetails() *list.List {
                }
 
                for e := r.originalBuffer.detailEvents.Front(); e != nil; e = 
e.Next() {
+                       if e.Value == nil {
+                               continue
+                       }
                        if e.Value.(SocketDataDetail).DataID() >= fromDataID && 
e.Value.(SocketDataDetail).DataID() <= endDataID {
                                events.PushBack(e.Value)
                        }
@@ -358,7 +332,7 @@ func (r *Buffer) BuildDetails() *list.List {
                                        dataIDList = append(dataIDList, 
e.Value.(SocketDataDetail).DataID())
                                }
                        }
-                       log.Debugf("cannot found details from original buffer, 
from data id: %d, end data id: %d, "+
+                       log.Infof("cannot found details from original buffer, 
from data id: %d, end data id: %d, "+
                                "ref: %p, existing details data id list: %v", 
fromDataID, endDataID, r.originalBuffer, dataIDList)
                }
 
@@ -409,6 +383,20 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
        return r.dataEvents.Back().Value.(SocketDataBuffer)
 }
 
+func (r *Buffer) TotalBuffer() []SocketDataBuffer {
+       if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
+               return nil
+       }
+       result := make([]SocketDataBuffer, 0, r.dataEvents.Len())
+       for e := r.dataEvents.Front(); e != nil; e = e.Next() {
+               if e.Value == nil {
+                       continue
+               }
+               result = append(result, e.Value.(SocketDataBuffer))
+       }
+       return result
+}
+
 // DetectNotSendingLastPosition detect the buffer contains not sending data: 
the BPF limited socket data count
 func (r *Buffer) DetectNotSendingLastPosition() *Position {
        if r == nil || r.dataEvents.Len() == 0 {
@@ -642,7 +630,7 @@ func (r *Buffer) PrepareForReading() bool {
        if r.shouldResetPosition {
                r.ResetForLoopReading()
                r.shouldResetPosition = false
-               return false
+               return r.PrepareForReading()
        }
        if r.head == nil || r.head.element == nil {
                // read in the first element
@@ -732,6 +720,11 @@ func (r *Buffer) removeElement0(element *list.Element) 
*list.Element {
        }
        result := element.Next()
        r.dataEvents.Remove(element)
+       if element.Value != nil {
+               if b, ok := element.Value.(SocketDataBuffer); ok && b != nil {
+                       PooledBuffer.Put(b.ReleaseBuffer())
+               }
+       }
        return result
 }
 
@@ -819,10 +812,14 @@ func (r *Buffer) DeleteExpireEvents(expireDuration 
time.Duration) int {
        expireTime := time.Now().Add(-expireDuration)
        // data event queue
        count := r.deleteEventsWithJudgement(r.dataEvents, func(element 
*list.Element) bool {
+               if element.Value == nil {
+                       return true
+               }
                buffer := element.Value.(SocketDataBuffer)
                startTime := host.Time(buffer.StartTime())
                if expireTime.After(startTime) {
                        r.latestExpiredDataID = buffer.DataID()
+                       PooledBuffer.Put(buffer.ReleaseBuffer())
                        return true
                }
                return false
diff --git a/pkg/tools/ip.go b/pkg/tools/ip.go
index 24305b9..b3a34fa 100644
--- a/pkg/tools/ip.go
+++ b/pkg/tools/ip.go
@@ -38,7 +38,7 @@ func DefaultHostIPAddress() string {
 
 // HostIPAddressV4 found the IPV4 address from appoint net interface name
 func HostIPAddressV4(name string) string {
-       address := host.ipAddresses[name]
+       address := host.ipAddressesByName[name]
        if address == nil {
                return ""
        }
@@ -47,7 +47,7 @@ func HostIPAddressV4(name string) string {
 
 // HostIPAddressV6 found the IPV6 address from appoint net interface name
 func HostIPAddressV6(name string) string {
-       address := host.ipAddresses[name]
+       address := host.ipAddressesByName[name]
        if address == nil {
                return ""
        }
@@ -56,10 +56,8 @@ func HostIPAddressV6(name string) string {
 
 // IsLocalHostAddress is the address from local
 func IsLocalHostAddress(address string) bool {
-       for _, h := range host.ipAddresses {
-               if h.ipV4 == address || h.ipV6 == address {
-                       return true
-               }
+       if host.ipAddressesByIP[address] {
+               return true
        }
        return address == "0.0.0.0"
 }
@@ -73,8 +71,9 @@ type hostInfo struct {
        // hostname
        name string
        // ip address
-       ipAddresses   map[string]*hostIPAddress
-       defaultIPAddr string
+       ipAddressesByName map[string]*hostIPAddress
+       ipAddressesByIP   map[string]bool
+       defaultIPAddr     string
 }
 
 type hostIPAddress struct {
@@ -83,7 +82,7 @@ type hostIPAddress struct {
 }
 
 func queryHostInfo() *hostInfo {
-       addresses, def, err := localIPAddress0()
+       addressesByName, def, err := localIPAddress0()
        if err != nil {
                panic(err)
        }
@@ -91,7 +90,12 @@ func queryHostInfo() *hostInfo {
        if err != nil {
                panic(err)
        }
-       return &hostInfo{name: name, ipAddresses: addresses, defaultIPAddr: def}
+       addressesByIP := make(map[string]bool)
+       for _, addr := range addressesByName {
+               addressesByIP[addr.ipV4] = true
+               addressesByIP[addr.ipV6] = true
+       }
+       return &hostInfo{name: name, ipAddressesByName: addressesByName, 
ipAddressesByIP: addressesByIP, defaultIPAddr: def}
 }
 
 func hostname0() (string, error) {
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 72dc1b9..3e3f934 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -18,14 +18,15 @@
 package ip
 
 import (
+       "fmt"
        "net"
        "syscall"
 
        "github.com/florianl/go-conntrack"
 
-       "github.com/apache/skywalking-rover/pkg/logger"
-
        "golang.org/x/sys/unix"
+
+       "github.com/apache/skywalking-rover/pkg/logger"
 )
 
 var log = logger.GetLogger("tools", "ip")
@@ -47,10 +48,11 @@ func NewConnTrack() (*ConnTrack, error) {
        if err != nil {
                return nil, err
        }
+
        return &ConnTrack{tracker: nfct}, nil
 }
 
-func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) bool {
+func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error {
        family := conntrack.IPv4
        if addr.Family == unix.AF_INET6 {
                family = conntrack.IPv6
@@ -64,17 +66,18 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) 
bool {
                session, e := c.tracker.Get(conntrack.Conntrack, family, 
conntrack.Con{Origin: tuple})
                if e != nil {
                        // try to get the reply session, if the info not exists 
or from accept events, have error is normal
-                       log.Debugf("cannot get the conntrack session, type: %s, 
family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v", info.name,
+                       return fmt.Errorf("cannot get the conntrack session, 
type: %s, family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v", 
info.name,
                                family, tuple.Src, *tuple.Proto.SrcPort, 
tuple.Dst, *tuple.Proto.DstPort, e)
-                       continue
                }
 
                if res := c.filterValidateReply(session, tuple); res != nil {
                        addr.DestIP = res.Src.String()
-                       return true
+                       addr.NeedConnTrack = false
+                       log.Debugf("update real peer address from conntrack: 
%s:%d", addr.DestIP, addr.DestPort)
+                       return nil
                }
        }
-       return false
+       return nil
 }
 
 func (c *ConnTrack) parseSocketToTuple(addr *SocketPair) *conntrack.IPTuple {
diff --git a/pkg/tools/ip/tcpresolver.go b/pkg/tools/ip/tcpresolver.go
index a07c216..9f09309 100644
--- a/pkg/tools/ip/tcpresolver.go
+++ b/pkg/tools/ip/tcpresolver.go
@@ -34,6 +34,8 @@ type SocketPair struct {
        SrcPort  uint16
        DestIP   string
        DestPort uint16
+
+       NeedConnTrack bool
 }
 
 func (s *SocketPair) IsValid() bool {

Reply via email to