This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch access_log_improvement
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit d62cbdd953d0ee9a1d96dde97aa11bde137bfcab
Author: mrproliu <[email protected]>
AuthorDate: Tue Dec 17 16:07:42 2024 +0800

    Improve the performance of access log module
---
 CHANGES.md                                         |   5 +-
 bpf/accesslog/common/connection.h                  |   6 +-
 bpf/include/queue.h                                |  26 +--
 bpf/include/socket_data.h                          |  29 ++-
 pkg/accesslog/collector/connection.go              |   2 +-
 pkg/accesslog/collector/protocols/http1.go         |  29 ++-
 pkg/accesslog/collector/protocols/queue.go         |  22 +-
 pkg/accesslog/collector/tls.go                     |  13 ++
 pkg/accesslog/common/connection.go                 |  98 +++++++--
 pkg/accesslog/events/data.go                       |   6 +
 pkg/accesslog/runner.go                            | 161 ++------------
 pkg/accesslog/sender/logs.go                       | 107 ++++++++++
 pkg/accesslog/sender/sender.go                     | 234 +++++++++++++++++++++
 pkg/logger/settings.go                             |   4 +-
 pkg/profiling/task/network/analyze/events/data.go  |   5 +
 .../task/network/analyze/layer7/events.go          |   2 +-
 .../layer7/protocols/http1/reader/request.go       |   7 +
 pkg/tools/btf/ebpf.go                              |   1 -
 pkg/tools/btf/linker.go                            |   8 +
 pkg/tools/btf/queue.go                             |  79 +++----
 pkg/tools/buffer/buffer.go                         |  31 +++
 pkg/tools/ip/conntrack.go                          |  19 +-
 22 files changed, 618 insertions(+), 276 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a27176d..8cb3d99 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,7 +16,10 @@ Release Notes.
 * Support parallel parsing protocol data in the access log module.
 * Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
 * Reduce missing details issue in the access log module.
-* Introduce ringbuf queue to improve performance in the access log module.
+* Improve HTTP/1.x protocol parsing strategy to encase missing data.
+* Add gRPC sender to sending the access log to the backend.
+* Add warning log when the event queue almost full in the access log module.
+* Reduce unessential `conntrack` query when detect new connection.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/common/connection.h 
b/bpf/accesslog/common/connection.h
index cb799e8..880826d 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -22,6 +22,7 @@
 #include "data_args.h"
 #include "socket_opts.h"
 #include "queue.h"
+#include "socket_data.h"
 
 // syscall:connect
 struct connect_args_t {
@@ -107,7 +108,7 @@ struct socket_connect_event_t {
     __u64 conntrack_upstream_iph;
     __u32 conntrack_upstream_port;
 };
-DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
+DATA_QUEUE(socket_connection_event_queue);
 
 // active connection cached into the hashmap
 // if connection closed, then deleted
@@ -159,7 +160,7 @@ struct socket_close_event_t {
     // close success
     __u32 success;
 };
-DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
+DATA_QUEUE(socket_close_event_queue);
 
 static __inline bool family_should_trace(const __u32 family) {
     return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? 
false : true;
@@ -303,6 +304,7 @@ static __inline void submit_connection_when_not_exists(void 
*ctx, __u64 id, stru
 }
 
 static __inline void notify_close_connection(void* ctx, __u64 conid, struct 
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
+    bpf_map_delete_elem(&socket_data_last_id_map, &conid);
     struct socket_close_event_t *close_event;
     close_event = rover_reserve_buf(&socket_close_event_queue, 
sizeof(*close_event));
     if (close_event == NULL) {
diff --git a/bpf/include/queue.h b/bpf/include/queue.h
index c6b532c..559aeb4 100644
--- a/bpf/include/queue.h
+++ b/bpf/include/queue.h
@@ -19,12 +19,10 @@
 
 #include "api.h"
 
-#define DATA_QUEUE(name, size)               \
-       struct {                                    \
-               __uint(type, BPF_MAP_TYPE_RINGBUF); \
-               __uint(max_entries, size);          \
-       } name SEC(".maps");                        \
-       const void *rover_data_queue_##name __attribute__((unused));
+#define DATA_QUEUE(name)                            \
+       struct {                                        \
+               __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);\
+       } name SEC(".maps");
 
 struct {
        __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
@@ -36,26 +34,12 @@ struct {
 static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
        static const int zero = 0;
 
-       if (bpf_core_enum_value_exists(enum bpf_func_id,
-                                      BPF_FUNC_ringbuf_reserve))
-               return bpf_ringbuf_reserve(map, size, 0);
-
        return bpf_map_lookup_elem(&rover_data_heap, &zero);
 }
 
-static __always_inline void rover_discard_buf(void *buf)
-{
-       if (bpf_core_enum_value_exists(enum bpf_func_id,
-                                      BPF_FUNC_ringbuf_discard))
-               bpf_ringbuf_discard(buf, 0);
+static __always_inline void rover_discard_buf(void *buf) {
 }
 
 static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, 
__u64 size) {
-       if (bpf_core_enum_value_exists(enum bpf_func_id,
-                                      BPF_FUNC_ringbuf_submit)) {
-               bpf_ringbuf_submit(buf, 0);
-               return 0;
-       }
-
        return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
 }
\ No newline at end of file
diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h
index 0d62ccb..248b34c 100644
--- a/bpf/include/socket_data.h
+++ b/bpf/include/socket_data.h
@@ -35,6 +35,7 @@ struct socket_data_upload_event {
     __u64 conid;
     __u64 randomid;
     __u64 data_id;
+    __u64 prev_data_id;
     __u64 total_size;
     char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH];
 };
@@ -44,7 +45,7 @@ struct {
     __type(value, struct socket_data_upload_event);
     __uint(max_entries, 1);
 } socket_data_upload_event_per_cpu_map SEC(".maps");
-DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
+DATA_QUEUE(socket_data_upload_queue);
 
 struct socket_data_sequence_t {
     __u64 data_id;
@@ -81,6 +82,7 @@ struct upload_data_args {
     __u64 random_id;
 
     __u64 socket_data_id;
+    __u64 prev_socket_data_id;
     struct iovec *socket_data_iovec;
     size_t socket_data_iovlen;
     ssize_t bytes_count;
@@ -129,11 +131,13 @@ static __always_inline void 
__upload_socket_data_with_buffer(void *ctx, __u8 ind
     socket_data_event->randomid = args->random_id;
     socket_data_event->total_size = args->bytes_count;
     socket_data_event->data_id = args->socket_data_id;
+    socket_data_event->prev_data_id = args->prev_socket_data_id;
 
     socket_data_event->sequence = index;
     socket_data_event->data_len = size;
     socket_data_event->finished = is_finished;
     socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
+    asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :);
     bpf_probe_read(&socket_data_event->buffer, size, buf);
     rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, 
sizeof(*socket_data_event));
 }
@@ -208,15 +212,38 @@ static __always_inline void upload_socket_data_iov(void 
*ctx, struct iovec* iov,
     UPLOAD_PER_SOCKET_DATA_IOV();
 }
 
+struct socket_data_last_id_t {
+    __u64 random_id;
+    __u64 socket_data_id;
+};
+struct {
+       __uint(type, BPF_MAP_TYPE_LRU_HASH);
+       __uint(max_entries, 10000);
+       __type(key, __u64);
+       __type(value, struct socket_data_last_id_t);
+} socket_data_last_id_map SEC(".maps");
+
 static __inline void upload_socket_data(void *ctx, struct upload_data_args 
*args) {
     // must have protocol and ssl must same(plain)
     // if the connection data is needs to skip upload, then skip
     if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || 
args->connection_ssl != args->socket_data_ssl || 
args->connection_skip_data_upload == 1) {
         return;
     }
+    struct socket_data_last_id_t *latest = 
bpf_map_lookup_elem(&socket_data_last_id_map, &args->con_id);
+    args->prev_socket_data_id = 0;
+    if (latest != NULL && latest->random_id == args->random_id) {
+        args->prev_socket_data_id = latest->socket_data_id;
+    }
     if (args->socket_data_buf != NULL) {
         upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, 
args, args->socket_ssl_buffer_force_unfinished);
     } else if (args->socket_data_iovec != NULL) {
         upload_socket_data_iov(ctx, args->socket_data_iovec, 
args->socket_data_iovlen, args->bytes_count, args);
     }
+
+    if (latest == NULL || latest->socket_data_id != args->socket_data_id) {
+        struct socket_data_last_id_t data = {};
+        data.random_id = args->random_id;
+        data.socket_data_id = args->socket_data_id;
+        bpf_map_update_elem(&socket_data_last_id_map, &args->con_id, &data, 
BPF_ANY);
+    }
 }
\ No newline at end of file
diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index a5565a1..d861447 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        if err != nil {
                connectionLogger.Warnf("cannot create the connection tracker, 
%v", err)
        }
-       c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
+       c.eventQueue = btf.NewEventQueue("connection resolver", 
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
                ctx.Config.ConnectionAnalyze.QueueSize, func(num int) 
btf.PartitionContext {
                        return newConnectionPartitionContext(ctx, track)
                })
diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 9c17160..227422f 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -90,9 +90,6 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                }
 
                messageType, err := p.reader.IdentityMessageType(buf)
-               log.Debugf("ready to reading message type, messageType: %v, 
buf: %p, data id: %d, "+
-                       "connection ID: %d, random ID: %d, error: %v", 
messageType, buf, buf.Position().DataID(),
-                       metrics.ConnectionID, metrics.RandomID, err)
                if err != nil {
                        http1Log.Debugf("failed to identity message type, %v", 
err)
                        if buf.SkipCurrentElement() {
@@ -115,6 +112,9 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                                metrics.ConnectionID, metrics.RandomID, 
buf.Position().DataID(), err)
                }
 
+               http1Log.Debugf("readed message, messageType: %v, buf: %p, data 
id: %d, "+
+                       "connection ID: %d, random ID: %d, metrics : %p, handle 
result: %d",
+                       messageType, buf, buf.Position().DataID(), 
metrics.ConnectionID, metrics.RandomID, metrics, result)
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
@@ -149,13 +149,13 @@ func (p *HTTP1Protocol) handleRequest(metrics 
*HTTP1Metrics, buf *buffer.Buffer)
 }
 
 func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b 
*buffer.Buffer) (enums.ParseResult, error) {
-       firstRequest := metrics.halfRequests.Front()
-       if firstRequest == nil {
-               log.Debugf("cannot found request for response, skip response, 
connection ID: %d, random ID: %d",
-                       metrics.ConnectionID, metrics.RandomID)
+       request := metrics.findMatchesRequest(b.Position().DataID(), 
b.Position().PrevDataID())
+       if request == nil {
+               log.Debugf("cannot found request for response, skip response, 
connection ID: %d, random ID: %d, "+
+                       "required prev data id: %d, current data id: %d",
+                       metrics.ConnectionID, metrics.RandomID, 
b.Position().PrevDataID(), b.Position().DataID())
                return enums.ParseResultSkipPackage, nil
        }
-       request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
 
        // parsing response
        response, result, err := p.reader.ReadResponse(request, b, true)
@@ -286,3 +286,16 @@ func (m *HTTP1Metrics) appendRequestToList(req 
*reader.Request) {
                m.halfRequests.PushBack(req)
        }
 }
+
+func (m *HTTP1Metrics) findMatchesRequest(currentDataID, prevDataID uint64) 
*reader.Request {
+       for element := m.halfRequests.Front(); element != nil; element = 
element.Next() {
+               req := element.Value.(*reader.Request)
+               // if the tail data id of request is equals to the prev data id 
of response
+               // or tail request data id+1==first response data id, then 
return the request
+               if uint64(req.MaxDataID()) == prevDataID || 
uint64(req.MaxDataID()+1) == currentDataID {
+                       m.halfRequests.Remove(element)
+                       return req
+               }
+       }
+       return nil
+}
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 7e07eb3..c9789b7 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -88,7 +88,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
 }
 
 func (q *AnalyzeQueue) Start(ctx context.Context) {
-       q.eventQueue = 
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
+       q.eventQueue = btf.NewEventQueue("socket data analyzer",
+               q.context.Config.ProtocolAnalyze.AnalyzeParallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
                func(num int) btf.PartitionContext {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
@@ -128,12 +129,13 @@ type PartitionContext struct {
 func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID 
uint64,
        protocol enums.ConnectionProtocol, currentDataID uint64) 
*PartitionConnection {
        connection := &PartitionConnection{
-               connectionID:     conID,
-               randomID:         randomID,
-               dataBuffers:      
make(map[enums.ConnectionProtocol]*buffer.Buffer),
-               protocol:         make(map[enums.ConnectionProtocol]uint64),
-               protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
-               protocolMetrics:  
make(map[enums.ConnectionProtocol]ProtocolMetrics),
+               connectionID:       conID,
+               randomID:           randomID,
+               dataBuffers:        
make(map[enums.ConnectionProtocol]*buffer.Buffer),
+               protocol:           make(map[enums.ConnectionProtocol]uint64),
+               protocolAnalyzer:   make(map[enums.ConnectionProtocol]Protocol),
+               protocolMetrics:    
make(map[enums.ConnectionProtocol]ProtocolMetrics),
+               lastCheckCloseTime: time.Now(),
        }
        connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, 
currentDataID)
        return connection
@@ -173,7 +175,6 @@ func (p *PartitionContext) OnConnectionClose(event 
*events.SocketCloseEvent, clo
        }
        connection := conn.(*PartitionConnection)
        connection.closeCallback = closeCallback
-       connection.closed = true
        log.Debugf("receive the connection close event and mark is closable, 
connection ID: %d, random ID: %d, partition number: %d",
                event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
 }
@@ -227,8 +228,9 @@ func (p *PartitionContext) Consume(data interface{}) {
                connection.AppendDetail(p.context, event)
        case *events.SocketDataUploadEvent:
                pid, _ := events.ParseConnectionID(event.ConnectionID)
-               log.Debugf("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
-                       event.ConnectionID, event.RandomID, pid, event.DataID0, 
event.Sequence0, event.Protocol0)
+               log.Debugf("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, prev data id: %d, "+
+                       "data id: %d, sequence: %d, protocol: %d",
+                       event.ConnectionID, event.RandomID, pid, 
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
                connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
                connection.AppendData(event)
        }
diff --git a/pkg/accesslog/collector/tls.go b/pkg/accesslog/collector/tls.go
index 41c8811..0d14a58 100644
--- a/pkg/accesslog/collector/tls.go
+++ b/pkg/accesslog/collector/tls.go
@@ -18,6 +18,8 @@
 package collector
 
 import (
+       "sync"
+
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/module"
@@ -33,6 +35,7 @@ type TLSCollector struct {
        context            *common.AccessLogContext
        monitoredProcesses map[int32]bool
        linker             *btf.Linker
+       mutex              sync.Mutex
 }
 
 func NewTLSCollector() *TLSCollector {
@@ -57,6 +60,14 @@ func (c *TLSCollector) Stop() {
 }
 
 func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
+       go func() {
+               c.addProcess(pid)
+       }()
+}
+
+func (c *TLSCollector) addProcess(pid int32) {
+       c.mutex.Lock()
+       defer c.mutex.Unlock()
        if _, ok := c.monitoredProcesses[pid]; ok {
                return
        }
@@ -83,5 +94,7 @@ func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
 }
 
 func (c *TLSCollector) OnProcessRemoved(pid int32) {
+       c.mutex.Lock()
+       defer c.mutex.Unlock()
        delete(c.monitoredProcesses, pid)
 }
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 7c0e992..e948272 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -57,6 +57,9 @@ const (
 
        // in case the reading the data from BPF queue is disordered, so add a 
delay time to delete the connection information
        connectionDeleteDelayTime = time.Second * 20
+
+       // the connection check exist time
+       connectionCheckExistTime = time.Second * 30
 )
 
 type addressProcessType int
@@ -144,13 +147,14 @@ type addressInfo struct {
 }
 
 type ConnectionInfo struct {
-       ConnectionID  uint64
-       RandomID      uint64
-       RPCConnection *v3.AccessLogConnection
-       MarkDeletable bool
-       PID           uint32
-       Socket        *ip.SocketPair
-       DeleteAfter   *time.Time
+       ConnectionID       uint64
+       RandomID           uint64
+       RPCConnection      *v3.AccessLogConnection
+       MarkDeletable      bool
+       PID                uint32
+       Socket             *ip.SocketPair
+       LastCheckExistTime time.Time
+       DeleteAfter        *time.Time
 }
 
 func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
@@ -192,8 +196,10 @@ func (c *ConnectionManager) Start(ctx context.Context, 
accessLogContext *AccessL
 
                                        // if the connection is not existed, 
then delete it
                                        if err := 
c.activeConnectionMap.Delete(conID); err != nil {
-                                               log.Warnf("failed to delete the 
active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: 
%v",
-                                                       pid, fd, conID, 
activateConn.RandomID, err)
+                                               if !errors.Is(err, 
ebpf.ErrKeyNotExist) {
+                                                       log.Warnf("failed to 
delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: 
%d, error: %v",
+                                                               pid, fd, conID, 
activateConn.RandomID, err)
+                                               }
                                                continue
                                        }
                                        log.Debugf("deleted the active 
connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, 
random ID: %d",
@@ -318,17 +324,20 @@ func (c *ConnectionManager) 
connectionPostHandle(connection *ConnectionInfo, eve
                        c.allUnfinishedConnections[fmt.Sprintf("%d_%d", 
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
                }
        case events.SocketDetail:
+               tlsMode := connection.RPCConnection.TlsMode
+               protocol := connection.RPCConnection.Protocol
                if e.GetSSL() == 1 && connection.RPCConnection.TlsMode == 
v3.AccessLogConnectionTLSMode_Plain {
-                       connection.RPCConnection.TlsMode = 
v3.AccessLogConnectionTLSMode_TLS
+                       tlsMode = v3.AccessLogConnectionTLSMode_TLS
                }
                if e.GetProtocol() != enums.ConnectionProtocolUnknown && 
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
                        switch e.GetProtocol() {
                        case enums.ConnectionProtocolHTTP:
-                               connection.RPCConnection.Protocol = 
v3.AccessLogProtocolType_HTTP_1
+                               protocol = v3.AccessLogProtocolType_HTTP_1
                        case enums.ConnectionProtocolHTTP2:
-                               connection.RPCConnection.Protocol = 
v3.AccessLogProtocolType_HTTP_2
+                               protocol = v3.AccessLogProtocolType_HTTP_2
                        }
                }
+               c.rebuildRPCConnectionWithTLSModeAndProtocol(connection, 
tlsMode, protocol)
        }
 
        // notify all flush listeners the connection is ready to flush
@@ -337,6 +346,22 @@ func (c *ConnectionManager) 
connectionPostHandle(connection *ConnectionInfo, eve
        }
 }
 
+// According to https://github.com/golang/protobuf/issues/1609
+// if the message is modified during marshalling, it may cause the error when 
send the message to the backend
+// so, we need to clone the message and change it before sending it to the 
channel
+func (c *ConnectionManager) 
rebuildRPCConnectionWithTLSModeAndProtocol(connection *ConnectionInfo,
+       tls v3.AccessLogConnectionTLSMode, protocol v3.AccessLogProtocolType) {
+       original := connection.RPCConnection
+       connection.RPCConnection = &v3.AccessLogConnection{
+               Local:      original.Local,
+               Remote:     original.Remote,
+               Role:       original.Role,
+               TlsMode:    tls,
+               Protocol:   protocol,
+               Attachment: original.Attachment,
+       }
+}
+
 func (c *ConnectionManager) ProcessIsMonitor(pid uint32) bool {
        c.monitoringProcessLock.RLock()
        defer c.monitoringProcessLock.RUnlock()
@@ -360,11 +385,12 @@ func (c *ConnectionManager) buildConnection(event 
*events.SocketConnectEvent, so
                Protocol: v3.AccessLogProtocolType_TCP,
        }
        return &ConnectionInfo{
-               ConnectionID:  event.ConID,
-               RandomID:      event.RandomID,
-               RPCConnection: connection,
-               PID:           event.PID,
-               Socket:        socket,
+               ConnectionID:       event.ConID,
+               RandomID:           event.RandomID,
+               RPCConnection:      connection,
+               PID:                event.PID,
+               Socket:             socket,
+               LastCheckExistTime: time.Now(),
        }
 }
 
@@ -503,6 +529,15 @@ func (c *ConnectionManager) AddNewProcess(pid int32, 
entities []api.ProcessInter
        defer c.monitoringProcessLock.Unlock()
 
        // adding monitoring process and IP addresses
+       var entity *api.ProcessEntity
+       if entities != nil && len(entities) > 0 {
+               entity = entities[0].Entity()
+       }
+       log.Infof("adding monitoring process, pid: %d, entity: %v", pid, entity)
+       if _, ok := c.monitoringProcesses[pid]; ok {
+               log.Infof("the process %d already monitoring, so no needs to 
add again", pid)
+               return
+       }
        c.monitoringProcesses[pid] = monitorProcesses
        c.updateMonitorStatusForProcess(pid, true)
        for _, entity := range monitorProcesses {
@@ -545,6 +580,30 @@ func (c *ConnectionManager) 
shouldMonitorProcesses(entities []api.ProcessInterfa
        return c.monitorFilter.ShouldIncludeProcesses(entities)
 }
 
+func (c *ConnectionManager) checkConnectionIsExist(con *ConnectionInfo) bool {
+       // skip the check if the check time is not reach
+       if time.Since(con.LastCheckExistTime) < connectionCheckExistTime {
+               return true
+       }
+       con.LastCheckExistTime = time.Now()
+       var activateConn ActiveConnection
+       c.activeConnectionMap.Lookup(con.ConnectionID, &activateConn)
+       if err := c.activeConnectionMap.Lookup(con.ConnectionID, 
&activateConn); err != nil {
+               if errors.Is(err, ebpf.ErrKeyNotExist) {
+                       con.MarkDeletable = true
+                       return false
+               }
+               log.Warnf("cannot found the active connection: %d-%d, err: %v", 
con.ConnectionID, con.RandomID, err)
+               return false
+       } else if activateConn.RandomID != 0 && activateConn.RandomID != 
con.RandomID {
+               log.Debugf("detect the connection: %d-%d is already closed(by 
difference random ID), so remove from the connection manager",
+                       con.ConnectionID, con.RandomID)
+               con.MarkDeletable = true
+               return false
+       }
+       return true
+}
+
 func (c *ConnectionManager) RemoveProcess(pid int32, entities 
[]api.ProcessInterface) {
        c.monitoringProcessLock.Lock()
        defer c.monitoringProcessLock.Unlock()
@@ -623,7 +682,7 @@ func (c *ConnectionManager) 
updateMonitorStatusForProcess(pid int32, monitor boo
 func (c *ConnectionManager) OnBuildConnectionLogFinished() {
        // delete all connections which marked as deletable
        // all deletable connection events been sent
-       deletableConnections := make(map[string]bool, 0)
+       deletableConnections := make(map[string]bool)
        now := time.Now()
        c.connections.IterCb(func(key string, v interface{}) {
                con, ok := v.(*ConnectionInfo)
@@ -632,6 +691,9 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
                }
                // already mark as deletable or process not monitoring
                shouldDelete := con.MarkDeletable || 
!c.ProcessIsMonitor(con.PID)
+               if !shouldDelete {
+                       shouldDelete = !c.checkConnectionIsExist(con)
+               }
 
                if shouldDelete && con.DeleteAfter == nil {
                        deleteAfterTime := now.Add(connectionDeleteDelayTime)
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 83e4ba4..6ecfd2f 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -36,6 +36,7 @@ type SocketDataUploadEvent struct {
        ConnectionID uint64
        RandomID     uint64
        DataID0      uint64
+       PrevDataID0  uint64
        TotalSize0   uint64
        Buffer       [2048]byte
 }
@@ -52,6 +53,7 @@ func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
        s.ConnectionID = r.ReadUint64()
        s.RandomID = r.ReadUint64()
        s.DataID0 = r.ReadUint64()
+       s.PrevDataID0 = r.ReadUint64()
        s.TotalSize0 = r.ReadUint64()
        r.ReadUint8Array(s.Buffer[:], 2048)
 }
@@ -96,6 +98,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
        return s.DataID0
 }
 
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+       return s.PrevDataID0
+}
+
 func (s *SocketDataUploadEvent) DataSequence() int {
        return int(s.Sequence0)
 }
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index 18577de..bc1a6ad 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -32,14 +32,11 @@ import (
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/accesslog/events"
        "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
+       "github.com/apache/skywalking-rover/pkg/accesslog/sender"
        "github.com/apache/skywalking-rover/pkg/core"
        "github.com/apache/skywalking-rover/pkg/core/backend"
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/module"
-       "github.com/apache/skywalking-rover/pkg/process"
-       "github.com/apache/skywalking-rover/pkg/tools/host"
-
-       v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
@@ -53,8 +50,8 @@ type Runner struct {
        mgr        *module.Manager
        backendOp  backend.Operator
        cluster    string
-       alsClient  v3.EBPFAccessLogServiceClient
        ctx        context.Context
+       sender     *sender.GRPCSender
 }
 
 func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) {
@@ -70,17 +67,18 @@ func NewRunner(mgr *module.Manager, config *common.Config) 
(*Runner, error) {
        backendOP := coreModule.BackendOperator()
        clusterName := coreModule.ClusterName()
        monitorFilter := 
common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","), 
strings.Split(config.ExcludeClusters, ","))
+       connectionMgr := common.NewConnectionManager(config, mgr, bpfLoader, 
monitorFilter)
        runner := &Runner{
                context: &common.AccessLogContext{
                        BPF:           bpfLoader,
                        Config:        config,
-                       ConnectionMgr: common.NewConnectionManager(config, mgr, 
bpfLoader, monitorFilter),
+                       ConnectionMgr: connectionMgr,
                },
                collectors: collector.Collectors(),
                mgr:        mgr,
                backendOp:  backendOP,
                cluster:    clusterName,
-               alsClient:  
v3.NewEBPFAccessLogServiceClient(backendOP.GetConnection()),
+               sender:     sender.NewGRPCSender(mgr, connectionMgr),
        }
        runner.context.Queue = common.NewQueue(config.Flush.MaxCountOneStream, 
flushDuration, runner)
        return runner, nil
@@ -91,6 +89,7 @@ func (r *Runner) Start(ctx context.Context) error {
        r.context.RuntimeContext = ctx
        r.context.Queue.Start(ctx)
        r.context.ConnectionMgr.Start(ctx, r.context)
+       r.sender.Start(ctx)
        for _, c := range r.collectors {
                err := c.Start(r.mgr, r.context)
                if err != nil {
@@ -111,26 +110,20 @@ func (r *Runner) Consume(kernels chan *common.KernelLog, 
protocols chan *common.
                return
        }
 
-       allLogs := r.buildConnectionLogs(kernels, protocols)
-       log.Debugf("ready to send access log, connection count: %d", 
len(allLogs))
-       if len(allLogs) == 0 {
-               return
-       }
-       if err := r.sendLogs(allLogs); err != nil {
-               log.Warnf("send access log failure: %v", err)
-       }
+       batch := r.sender.NewBatch()
+       r.buildConnectionLogs(batch, kernels, protocols)
+       log.Debugf("ready to send access log, connection count: %d", 
batch.ConnectionCount())
+       r.sender.AddBatch(batch)
 }
 
-func (r *Runner) buildConnectionLogs(kernels chan *common.KernelLog, protocols 
chan *common.ProtocolLog) map[*common.ConnectionInfo]*connectionLogs {
-       result := make(map[*common.ConnectionInfo]*connectionLogs)
-       r.buildKernelLogs(kernels, result)
-       r.buildProtocolLogs(protocols, result)
+func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan 
*common.KernelLog, protocols chan *common.ProtocolLog) {
+       r.buildKernelLogs(kernels, batch)
+       r.buildProtocolLogs(protocols, batch)
 
        r.context.ConnectionMgr.OnBuildConnectionLogFinished()
-       return result
 }
 
-func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result 
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch 
*sender.BatchLogs) {
        delayAppends := make([]*common.KernelLog, 0)
        for {
                select {
@@ -139,13 +132,7 @@ func (r *Runner) buildKernelLogs(kernels chan 
*common.KernelLog, result map[*com
                        log.Debugf("building kernel log result, connetaion ID: 
%d, random ID: %d, exist connection: %t, delay: %t",
                                kernelLog.Event.GetConnectionID(), 
kernelLog.Event.GetRandomID(), connection != nil, delay)
                        if connection != nil && curLog != nil {
-                               logs, exist := result[connection]
-                               if !exist {
-                                       logs = newConnectionLogs()
-                                       result[connection] = logs
-                               }
-
-                               logs.kernels = append(logs.kernels, curLog)
+                               batch.AppendKernelLog(connection, curLog)
                        } else if delay {
                                delayAppends = append(delayAppends, kernelLog)
                        }
@@ -162,7 +149,7 @@ func (r *Runner) buildKernelLogs(kernels chan 
*common.KernelLog, result map[*com
        }
 }
 
-func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result 
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch 
*sender.BatchLogs) {
        delayAppends := make([]*common.ProtocolLog, 0)
        for {
                select {
@@ -178,15 +165,7 @@ func (r *Runner) buildProtocolLogs(protocols chan 
*common.ProtocolLog, result ma
                                        conID, randomID, connection != nil, 
delay)
                        }
                        if connection != nil && len(kernelLogs) > 0 && 
protocolLogs != nil {
-                               logs, exist := result[connection]
-                               if !exist {
-                                       logs = newConnectionLogs()
-                                       result[connection] = logs
-                               }
-                               logs.protocols = append(logs.protocols, 
&connectionProtocolLog{
-                                       kernels:  kernelLogs,
-                                       protocol: protocolLogs,
-                               })
+                               batch.AppendProtocolLog(connection, kernelLogs, 
protocolLogs)
                        } else if delay {
                                delayAppends = append(delayAppends, protocolLog)
                        }
@@ -203,95 +182,6 @@ func (r *Runner) buildProtocolLogs(protocols chan 
*common.ProtocolLog, result ma
        }
 }
 
-func (r *Runner) sendLogs(allLogs map[*common.ConnectionInfo]*connectionLogs) 
error {
-       timeout, cancelFunc := context.WithTimeout(r.ctx, time.Second*20)
-       defer cancelFunc()
-       streaming, err := r.alsClient.Collect(timeout)
-       if err != nil {
-               return err
-       }
-
-       firstLog := true
-       firstConnection := true
-       for connection, logs := range allLogs {
-               if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
-                       continue
-               }
-               if log.Enable(logrus.DebugLevel) {
-                       log.Debugf("ready to sending access log with 
connection, connection ID: %d, random ID: %d, "+
-                               "local: %s, remote: %s, role: %s, kernel logs 
count: %d, protocol log count: %d",
-                               connection.ConnectionID, connection.RandomID, 
connection.RPCConnection.Local, connection.RPCConnection.Remote,
-                               connection.RPCConnection.Role, 
len(logs.kernels), len(logs.protocols))
-               }
-
-               if len(logs.kernels) > 0 {
-                       r.sendLogToTheStream(streaming, 
r.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, 
nil))
-                       firstLog, firstConnection = false, false
-               }
-               for _, protocolLog := range logs.protocols {
-                       r.sendLogToTheStream(streaming, 
r.buildAccessLogMessage(firstLog, firstConnection, connection, 
protocolLog.kernels, protocolLog.protocol))
-                       firstLog, firstConnection = false, false
-               }
-
-               firstConnection = true
-       }
-
-       if _, err := streaming.CloseAndRecv(); err != nil {
-               log.Warnf("closing the access log streaming error: %v", err)
-       }
-       return nil
-}
-
-func (r *Runner) sendLogToTheStream(streaming 
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) {
-       if err := streaming.Send(logMsg); err != nil {
-               log.Warnf("send access log failure: %v", err)
-       }
-}
-
-func (r *Runner) buildAccessLogMessage(firstLog, firstConnection bool, conn 
*common.ConnectionInfo,
-       kernelLogs []*v3.AccessLogKernelLog, protocolLog 
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
-       var rpcCon *v3.AccessLogConnection
-       if firstConnection {
-               rpcCon = conn.RPCConnection
-       }
-       return &v3.EBPFAccessLogMessage{
-               Node:        r.BuildNodeInfo(firstLog),
-               Connection:  rpcCon,
-               KernelLogs:  kernelLogs,
-               ProtocolLog: protocolLog,
-       }
-}
-
-func (r *Runner) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
-       if !needs {
-               return nil
-       }
-       netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
-       for i, n := range host.AllNetworkInterfaces() {
-               netInterfaces = append(netInterfaces, 
&v3.EBPFAccessLogNodeNetInterface{
-                       Index: int32(i),
-                       Mtu:   int32(n.MTU),
-                       Name:  n.Name,
-               })
-       }
-       return &v3.EBPFAccessLogNodeInfo{
-               Name:          
r.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
-               NetInterfaces: netInterfaces,
-               BootTime:      r.convertTimeToInstant(host.BootTime),
-               ClusterName:   r.cluster,
-               Policy: &v3.EBPFAccessLogPolicy{
-                       ExcludeNamespaces: 
r.context.ConnectionMgr.GetExcludeNamespaces(),
-               },
-       }
-}
-
-func (r *Runner) convertTimeToInstant(t time.Time) *v32.Instant {
-       return &v32.Instant{
-               Seconds: t.Unix(),
-               Nanos:   int32(t.Nanosecond()),
-       }
-}
-
 func (r *Runner) shouldReportProcessLog(pid uint32) bool {
        // if the process not monitoring, then check the process is existed or 
not
        if r.context.ConnectionMgr.ProcessIsMonitor(pid) {
@@ -364,20 +254,3 @@ func (r *Runner) Stop() error {
        r.context.ConnectionMgr.Stop()
        return nil
 }
-
-type connectionLogs struct {
-       kernels   []*v3.AccessLogKernelLog
-       protocols []*connectionProtocolLog
-}
-
-type connectionProtocolLog struct {
-       kernels  []*v3.AccessLogKernelLog
-       protocol *v3.AccessLogProtocolLogs
-}
-
-func newConnectionLogs() *connectionLogs {
-       return &connectionLogs{
-               kernels:   make([]*v3.AccessLogKernelLog, 0),
-               protocols: make([]*connectionProtocolLog, 0),
-       }
-}
diff --git a/pkg/accesslog/sender/logs.go b/pkg/accesslog/sender/logs.go
new file mode 100644
index 0000000..95f2710
--- /dev/null
+++ b/pkg/accesslog/sender/logs.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+       "github.com/apache/skywalking-rover/pkg/accesslog/common"
+
+       v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var maxLogsPerSend = 10_000
+
+type BatchLogs struct {
+       logs map[*common.ConnectionInfo]*ConnectionLogs
+}
+
+func newBatchLogs() *BatchLogs {
+       return &BatchLogs{
+               logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+       }
+}
+
+func (l *BatchLogs) ConnectionCount() int {
+       return len(l.logs)
+}
+
+func (l *BatchLogs) AppendKernelLog(connection *common.ConnectionInfo, log 
*v3.AccessLogKernelLog) {
+       logs, ok := l.logs[connection]
+       if !ok {
+               logs = newConnectionLogs()
+               l.logs[connection] = logs
+       }
+
+       logs.kernels = append(logs.kernels, log)
+}
+
+func (l *BatchLogs) AppendProtocolLog(connection *common.ConnectionInfo, 
kernels []*v3.AccessLogKernelLog, protocols *v3.AccessLogProtocolLogs) {
+       logs, ok := l.logs[connection]
+       if !ok {
+               logs = newConnectionLogs()
+               l.logs[connection] = logs
+       }
+
+       logs.protocols = append(logs.protocols, &ConnectionProtocolLog{
+               kernels:  kernels,
+               protocol: protocols,
+       })
+}
+
+func (l *BatchLogs) splitBatchLogs() []*BatchLogs {
+       logsCount := len(l.logs)
+       if logsCount == 0 {
+               return nil
+       }
+       splitCount := logsCount / maxLogsPerSend
+       if logsCount%maxLogsPerSend != 0 {
+               splitCount++
+       }
+       result := make([]*BatchLogs, 0, splitCount)
+
+       // split the connections by maxLogsPerSend
+       currentCount := 0
+       var currentBatch *BatchLogs
+       for connection, logs := range l.logs {
+               if currentCount%maxLogsPerSend == 0 {
+                       currentBatch = newBatchLogs()
+                       result = append(result, currentBatch)
+                       currentCount = 0
+               }
+               currentBatch.logs[connection] = logs
+               currentCount++
+       }
+
+       return result
+}
+
+type ConnectionLogs struct {
+       kernels   []*v3.AccessLogKernelLog
+       protocols []*ConnectionProtocolLog
+}
+
+type ConnectionProtocolLog struct {
+       kernels  []*v3.AccessLogKernelLog
+       protocol *v3.AccessLogProtocolLogs
+}
+
+func newConnectionLogs() *ConnectionLogs {
+       return &ConnectionLogs{
+               kernels:   make([]*v3.AccessLogKernelLog, 0),
+               protocols: make([]*ConnectionProtocolLog, 0),
+       }
+}
diff --git a/pkg/accesslog/sender/sender.go b/pkg/accesslog/sender/sender.go
new file mode 100644
index 0000000..ad6e921
--- /dev/null
+++ b/pkg/accesslog/sender/sender.go
@@ -0,0 +1,234 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+       "container/list"
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-rover/pkg/accesslog/common"
+       "github.com/apache/skywalking-rover/pkg/core"
+       "github.com/apache/skywalking-rover/pkg/logger"
+       "github.com/apache/skywalking-rover/pkg/module"
+       "github.com/apache/skywalking-rover/pkg/process"
+       "github.com/apache/skywalking-rover/pkg/tools/host"
+
+       "github.com/sirupsen/logrus"
+
+       v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
+       v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var log = logger.GetLogger("accesslog", "sender")
+
+// GRPCSender Async to sending the access log to the backend
+type GRPCSender struct {
+       logs   *list.List
+       notify chan bool
+       mutex  sync.Mutex
+       ctx    context.Context
+
+       mgr           *module.Manager
+       connectionMgr *common.ConnectionManager
+       alsClient     v3.EBPFAccessLogServiceClient
+       clusterName   string
+}
+
+// NewGRPCSender creates a new GRPCSender
+func NewGRPCSender(mgr *module.Manager, connectionMgr 
*common.ConnectionManager) *GRPCSender {
+       return &GRPCSender{
+               logs:          list.New(),
+               notify:        make(chan bool, 1),
+               mgr:           mgr,
+               connectionMgr: connectionMgr,
+               clusterName:   
mgr.FindModule(core.ModuleName).(core.Operator).ClusterName(),
+               alsClient: 
v3.NewEBPFAccessLogServiceClient(mgr.FindModule(core.ModuleName).(core.Operator).
+                       BackendOperator().GetConnection()),
+       }
+}
+
+func (g *GRPCSender) Start(ctx context.Context) {
+       g.ctx = ctx
+       go func() {
+               for {
+                       select {
+                       case <-g.notify:
+                               if count, err := g.handleLogs(); err != nil {
+                                       log.Warnf("sending access log error, 
lost %d logs, error: %v", count, err)
+                               }
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+}
+
+func (g *GRPCSender) NewBatch() *BatchLogs {
+       return &BatchLogs{
+               logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+       }
+}
+
+func (g *GRPCSender) AddBatch(batch *BatchLogs) {
+       // split logs
+       splitLogs := batch.splitBatchLogs()
+
+       // append the resend logs
+       g.mutex.Lock()
+       defer g.mutex.Unlock()
+       for _, l := range splitLogs {
+               g.logs.PushBack(l)
+       }
+
+       // notify the sender
+       select {
+       case g.notify <- true:
+       default:
+       }
+}
+
+func (g *GRPCSender) handleLogs() (int, error) {
+       for {
+               // pop logs
+               logs := g.popLogs()
+               if logs == nil {
+                       return 0, nil
+               }
+               // send logs
+               now := time.Now()
+               if err := g.sendLogs(logs); err != nil {
+                       return len(logs.logs), err
+               }
+               log.Infof("sending access log success, connection count: %d, 
use time: %s",
+                       logs.ConnectionCount(), time.Since(now).String())
+       }
+}
+
+func (g *GRPCSender) sendLogs(batch *BatchLogs) error {
+       timeout, cancelFunc := context.WithTimeout(g.ctx, time.Second*20)
+       defer cancelFunc()
+       streaming, err := g.alsClient.Collect(timeout)
+       if err != nil {
+               return err
+       }
+
+       firstLog := true
+       firstConnection := true
+       var sendError error
+       for connection, logs := range batch.logs {
+               if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
+                       continue
+               }
+               if log.Enable(logrus.DebugLevel) {
+                       log.Debugf("ready to sending access log with 
connection, connection ID: %d, random ID: %d, "+
+                               "local: %s, remote: %s, role: %s, contains 
ztunnel address: %t, kernel logs count: %d, protocol log count: %d",
+                               connection.ConnectionID, connection.RandomID, 
connection.RPCConnection.Local, connection.RPCConnection.Remote,
+                               connection.RPCConnection.Role, 
connection.RPCConnection.Attachment != nil, len(logs.kernels), 
len(logs.protocols))
+               }
+
+               if len(logs.kernels) > 0 {
+                       sendError = g.sendLogToTheStream(streaming, 
g.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, 
nil))
+                       firstLog, firstConnection = false, false
+               }
+               for _, protocolLog := range logs.protocols {
+                       sendError = g.sendLogToTheStream(streaming, 
g.buildAccessLogMessage(firstLog, firstConnection, connection, 
protocolLog.kernels, protocolLog.protocol))
+                       firstLog, firstConnection = false, false
+               }
+               if sendError != nil {
+                       g.closeStream(streaming)
+                       return fmt.Errorf("sending access log error: %v", 
sendError)
+               }
+
+               firstConnection = true
+       }
+
+       g.closeStream(streaming)
+       return nil
+}
+
+func (g *GRPCSender) closeStream(s v3.EBPFAccessLogService_CollectClient) {
+       if _, err := s.CloseAndRecv(); err != nil {
+               log.Warnf("closing the access log streaming error: %v", err)
+       }
+}
+
+func (g *GRPCSender) sendLogToTheStream(streaming 
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) error {
+       if err := streaming.Send(logMsg); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (g *GRPCSender) buildAccessLogMessage(firstLog, firstConnection bool, 
conn *common.ConnectionInfo,
+       kernelLogs []*v3.AccessLogKernelLog, protocolLog 
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
+       var rpcCon *v3.AccessLogConnection
+       if firstConnection {
+               rpcCon = conn.RPCConnection
+       }
+       return &v3.EBPFAccessLogMessage{
+               Node:        g.BuildNodeInfo(firstLog),
+               Connection:  rpcCon,
+               KernelLogs:  kernelLogs,
+               ProtocolLog: protocolLog,
+       }
+}
+
+func (g *GRPCSender) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
+       if !needs {
+               return nil
+       }
+       netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
+       for i, n := range host.AllNetworkInterfaces() {
+               netInterfaces = append(netInterfaces, 
&v3.EBPFAccessLogNodeNetInterface{
+                       Index: int32(i),
+                       Mtu:   int32(n.MTU),
+                       Name:  n.Name,
+               })
+       }
+       return &v3.EBPFAccessLogNodeInfo{
+               Name:          
g.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
+               NetInterfaces: netInterfaces,
+               BootTime:      g.convertTimeToInstant(host.BootTime),
+               ClusterName:   g.clusterName,
+               Policy: &v3.EBPFAccessLogPolicy{
+                       ExcludeNamespaces: 
g.connectionMgr.GetExcludeNamespaces(),
+               },
+       }
+}
+
+func (g *GRPCSender) convertTimeToInstant(t time.Time) *v32.Instant {
+       return &v32.Instant{
+               Seconds: t.Unix(),
+               Nanos:   int32(t.Nanosecond()),
+       }
+}
+
+func (g *GRPCSender) popLogs() *BatchLogs {
+       g.mutex.Lock()
+       defer g.mutex.Unlock()
+       if g.logs.Len() == 0 {
+               return nil
+       }
+       e := g.logs.Front()
+       logs := e.Value.(*BatchLogs)
+       g.logs.Remove(e)
+       return logs
+}
diff --git a/pkg/logger/settings.go b/pkg/logger/settings.go
index a56c6ab..7705bb9 100644
--- a/pkg/logger/settings.go
+++ b/pkg/logger/settings.go
@@ -17,7 +17,9 @@
 
 package logger
 
-import "github.com/sirupsen/logrus"
+import (
+       "github.com/sirupsen/logrus"
+)
 
 const (
        DefaultLoggerLevel = logrus.InfoLevel
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index 1f71fa8..bc451fc 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -35,6 +35,7 @@ type SocketDataUploadEvent struct {
        ConnectionID uint64
        RandomID     uint64
        DataID0      uint64
+       PrevDataID0  uint64
        TotalSize0   uint64
        Buffer       [2048]byte
 }
@@ -79,6 +80,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
        return s.DataID0
 }
 
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+       return s.PrevDataID0
+}
+
 func (s *SocketDataUploadEvent) DataSequence() int {
        return int(s.Sequence0)
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 707aaa8..511a088 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -29,7 +29,7 @@ import (
 )
 
 func (l *Listener) initSocketDataQueue(parallels, queueSize int, config 
*profiling.TaskConfig) {
-       l.socketDataQueue = btf.NewEventQueue(parallels, queueSize, func(num 
int) btf.PartitionContext {
+       l.socketDataQueue = btf.NewEventQueue("socket data resolver", 
parallels, queueSize, func(num int) btf.PartitionContext {
                return NewSocketDataPartitionContext(l, config)
        })
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 7224727..f966743 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -57,6 +57,13 @@ func (r *Request) MinDataID() int {
        return int(r.headerBuffer.FirstSocketBuffer().DataID())
 }
 
+func (r *Request) MaxDataID() int {
+       if r.bodyBuffer != nil {
+               return int(r.bodyBuffer.LastSocketBuffer().DataID())
+       }
+       return int(r.headerBuffer.LastSocketBuffer().DataID())
+}
+
 func (r *Request) Original() *http.Request {
        return r.original
 }
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 307f29f..c220664 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -62,7 +62,6 @@ func GetEBPFCollectionOptionsIfNeed(bpfSpec 
*ebpf.CollectionSpec) *ebpf.Collecti
                spec = readSpec
        })
 
-       enhanceDataQueueOpts(bpfSpec)
        return &ebpf.CollectionOptions{Programs: 
ebpf.ProgramOptions{KernelTypes: spec}}
 }
 
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 867d84e..bdbd7a6 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -83,6 +83,7 @@ type Linker struct {
        closeOnce sync.Once
 
        linkedUProbes map[string]bool
+       linkMutex     sync.Mutex
 }
 
 func NewLinker() *Linker {
@@ -136,6 +137,8 @@ func (m *Linker) AddSysCallWithKProbe(call string, linkK 
LinkFunc, p *ebpf.Progr
        if p == nil {
                return
        }
+       m.linkMutex.Lock()
+       defer m.linkMutex.Unlock()
        kprobe, err := linkK(syscallPrefix+call, p, nil)
 
        if err != nil {
@@ -147,10 +150,13 @@ func (m *Linker) AddSysCallWithKProbe(call string, linkK 
LinkFunc, p *ebpf.Progr
 }
 
 func (m *Linker) AddTracePoint(sys, name string, p *ebpf.Program) {
+       m.linkMutex.Lock()
+       defer m.linkMutex.Unlock()
        l, e := link.Tracepoint(sys, name, p, nil)
        if e != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open %s 
error: %v", name, e))
        } else {
+               log.Debugf("attach to the tracepoint: sys: %s, name: %s", sys, 
name)
                m.closers = append(m.closers, l)
        }
 }
@@ -264,6 +270,8 @@ func (u *UProbeExeFile) AddLinkWithType(symbol string, 
enter bool, p *ebpf.Progr
 }
 
 func (u *UProbeExeFile) addLinkWithType0(symbol string, enter bool, p 
*ebpf.Program, customizeAddress uint64) (link.Link, error) {
+       u.linker.linkMutex.Lock()
+       defer u.linker.linkMutex.Unlock()
        // check already linked
        uprobeIdentity := fmt.Sprintf("%s_%s_%t_%d", u.addr, symbol, enter, 
customizeAddress)
        if u.linker.linkedUProbes[uprobeIdentity] {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 290f717..c5ce07b 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -21,9 +21,8 @@ import (
        "context"
        "fmt"
        "hash/fnv"
-       "os"
-       "strings"
        "sync"
+       "time"
 
        "github.com/cilium/ebpf"
        "github.com/cilium/ebpf/perf"
@@ -32,50 +31,9 @@ import (
 
 const dataQueuePrefix = "rover_data_queue_"
 
-var (
-       ringbufChecker   sync.Once
-       ringbufAvailable bool
-)
-
-func isRingbufAvailable() bool {
-       ringbufChecker.Do(func() {
-               buf, err := ebpf.NewMap(&ebpf.MapSpec{
-                       Type:       ebpf.RingBuf,
-                       MaxEntries: uint32(os.Getpagesize()),
-               })
-
-               buf.Close()
-
-               ringbufAvailable = err == nil
-
-               if ringbufAvailable {
-                       log.Infof("detect the ring buffer is available in 
current system for enhancement of data queue")
-               }
-       })
-
-       return ringbufAvailable
-}
-
-func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) {
-       it := bpfSpec.Types.Iterate()
-       for it.Next() {
-               if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) {
-                       continue
-               }
-               if err := validateGlobalConstVoidPtrVar(it.Type); err != nil {
-                       panic(fmt.Errorf("invalid global const void ptr var %s: 
%v", it.Type.TypeName(), err))
-               }
-
-               // if the ringbuf not available, use perf event array
-               if !isRingbufAvailable() {
-                       mapName := strings.TrimPrefix(it.Type.TypeName(), 
dataQueuePrefix)
-                       mapSpec := bpfSpec.Maps[mapName]
-                       mapSpec.Type = ebpf.PerfEventArray
-                       mapSpec.KeySize = 4
-                       mapSpec.ValueSize = 4
-               }
-       }
-}
+// queueChannelReducingCountCheckInterval is the interval to check the queue 
channel reducing count
+// if the reducing count is almost full, then added a warning log
+const queueChannelReducingCountCheckInterval = time.Second * 5
 
 type queueReader interface {
        Read() ([]byte, error)
@@ -125,6 +83,7 @@ func (p *perfQueueReader) Close() error {
 
 type ringBufReader struct {
        reader *ringbuf.Reader
+       name   string
 }
 
 func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
@@ -132,7 +91,7 @@ func newRingBufReader(emap *ebpf.Map) (*ringBufReader, 
error) {
        if err != nil {
                return nil, err
        }
-       return &ringBufReader{reader: reader}, nil
+       return &ringBufReader{reader: reader, name: emap.String()}, nil
 }
 
 func (r *ringBufReader) Read() ([]byte, error) {
@@ -153,6 +112,7 @@ type PartitionContext interface {
 }
 
 type EventQueue struct {
+       name       string
        count      int
        receivers  []*mapReceiver
        partitions []*partition
@@ -168,12 +128,12 @@ type mapReceiver struct {
        parallels    int
 }
 
-func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator 
func(partitionNum int) PartitionContext) *EventQueue {
+func NewEventQueue(name string, partitionCount, sizePerPartition int, 
contextGenerator func(partitionNum int) PartitionContext) *EventQueue {
        partitions := make([]*partition, 0)
        for i := 0; i < partitionCount; i++ {
                partitions = append(partitions, newPartition(i, 
sizePerPartition, contextGenerator(i)))
        }
-       return &EventQueue{count: partitionCount, partitions: partitions}
+       return &EventQueue{name: name, count: partitionCount, partitions: 
partitions}
 }
 
 func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
@@ -236,6 +196,27 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
                        }
                }(ctx, i)
        }
+
+       // channel reducing count check
+       go func() {
+               ticker := time.NewTicker(queueChannelReducingCountCheckInterval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-ticker.C:
+                               for _, p := range e.partitions {
+                                       reducingCount := len(p.channel)
+                                       if reducingCount > cap(p.channel)*9/10 {
+                                               log.Warnf("queue %s partition 
%d reducing count is almost full, "+
+                                                       "please trying to 
increase the parallels count or queue size, status: %d/%d",
+                                                       e.name, p.index, 
reducingCount, cap(p.channel))
+                                       }
+                               }
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
 }
 
 func (e *EventQueue) routerTransformer(data interface{}, routeGenerator 
func(data interface{}) string) {
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index c16a9fd..adf35ff 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -20,6 +20,7 @@ package buffer
 import (
        "container/list"
        "errors"
+       "fmt"
        "io"
        "sync"
        "time"
@@ -55,6 +56,8 @@ type SocketDataBuffer interface {
        BufferLen() int
        // DataID data id of the buffer
        DataID() uint64
+       // PrevDataID the previous data id of the buffer
+       PrevDataID() uint64
        // DataSequence the data sequence under same data id
        DataSequence() int
        // IsStart this buffer is start of the same data id
@@ -83,11 +86,19 @@ type DataIDRange struct {
        IsToBufferReadFinished bool
 }
 
+func (r *DataIDRange) String() string {
+       return fmt.Sprintf("from: %d, to: %d, isToBufferReadFinished: %t", 
r.From, r.To, r.IsToBufferReadFinished)
+}
+
 type Buffer struct {
        dataEvents   *list.List
        detailEvents *list.List
        validated    bool // the events list is validated or not
 
+       // shouldResetPosition means the buffer have new buffer appended into 
the buffer
+       // so the position should reset to the head, and re-read the buffer
+       shouldResetPosition bool
+
        eventLocker sync.RWMutex
 
        head    *Position
@@ -184,6 +195,10 @@ func (p *Position) DataID() uint64 {
        return p.element.Value.(SocketDataBuffer).DataID()
 }
 
+func (p *Position) PrevDataID() uint64 {
+       return p.element.Value.(SocketDataBuffer).PrevDataID()
+}
+
 func (p *Position) Seq() int {
        return p.element.Value.(SocketDataBuffer).DataSequence()
 }
@@ -609,6 +624,12 @@ func (r *Buffer) PrepareForReading() bool {
        if r.dataEvents.Len() == 0 {
                return false
        }
+       // if the buffer should reset, then reset the position and cannot be 
read from current position
+       if r.shouldResetPosition {
+               r.ResetForLoopReading()
+               r.shouldResetPosition = false
+               return false
+       }
        if r.head == nil || r.head.element == nil {
                // read in the first element
                r.eventLocker.RLock()
@@ -741,8 +762,16 @@ func (r *Buffer) AppendDataEvent(event SocketDataBuffer) {
        r.eventLocker.Lock()
        defer r.eventLocker.Unlock()
 
+       defer func() {
+               // if the current position is not nil and the current reading 
data id is bigger than the event data id
+               if r.current != nil && r.current.DataID() > event.DataID() {
+                       r.shouldResetPosition = true
+               }
+       }()
+
        if r.dataEvents.Len() == 0 {
                r.dataEvents.PushFront(event)
+               r.shouldResetPosition = true
                return
        }
        if r.dataEvents.Back().Value.(SocketDataBuffer).DataID() < 
event.DataID() {
@@ -802,6 +831,8 @@ func (r *Buffer) DeleteExpireEvents(expireDuration 
time.Duration) int {
 }
 
 func (r *Buffer) DataLength() int {
+       r.eventLocker.RLock()
+       defer r.eventLocker.RUnlock()
        if r.dataEvents == nil {
                return 0
        }
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index b1357cd..fff1ec2 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -18,11 +18,10 @@
 package ip
 
 import (
+       "github.com/apache/skywalking-rover/pkg/logger"
        "net"
        "syscall"
 
-       "github.com/apache/skywalking-rover/pkg/logger"
-
        "github.com/florianl/go-conntrack"
 
        "golang.org/x/sys/unix"
@@ -36,9 +35,6 @@ var numberStrategies = []struct {
 }{{
        name:  "tcp",
        proto: syscall.IPPROTO_TCP,
-}, {
-       name:  "udp",
-       proto: syscall.IPPROTO_UDP,
 }}
 
 type ConnTrack struct {
@@ -77,19 +73,6 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) 
bool {
                        return true
                }
        }
-
-       // using dump to query protocol
-       dump, e := c.tracker.Dump(conntrack.Conntrack, family)
-       if e != nil {
-               log.Debug("cannot dump the conntrack session, error: ", e)
-               return false
-       }
-       if res := c.filterValidateReply(dump, tuple); res != nil {
-               addr.DestIP = res.Src.String()
-               log.Debugf("found the connection from the dump all conntrack, 
src: %s:%d, dst: %s:%d, proto number: %d",
-                       tuple.Src, *tuple.Proto.SrcPort, tuple.Dst, 
*tuple.Proto.DstPort, *tuple.Proto.Number)
-               return true
-       }
        return false
 }
 

Reply via email to