This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 78fb9c4 Improve the performance of access log module (#171)
78fb9c4 is described below
commit 78fb9c48b19e2a11d0c67b0097f02552aadb8116
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 23 21:24:56 2024 +0800
Improve the performance of access log module (#171)
---
CHANGES.md | 5 +-
bpf/accesslog/common/connection.h | 6 +-
bpf/accesslog/syscalls/transfer.h | 2 +-
bpf/include/queue.h | 26 +--
bpf/include/socket_data.h | 29 ++-
pkg/accesslog/collector/connection.go | 2 +-
pkg/accesslog/collector/protocols/http1.go | 29 ++-
pkg/accesslog/collector/protocols/queue.go | 22 +-
pkg/accesslog/collector/tls.go | 13 ++
pkg/accesslog/common/connection.go | 97 +++++++--
pkg/accesslog/events/data.go | 6 +
pkg/accesslog/events/events_test.go | 231 ++++++++++----------
pkg/accesslog/runner.go | 160 ++------------
pkg/accesslog/sender/logs.go | 107 ++++++++++
pkg/accesslog/sender/sender.go | 236 +++++++++++++++++++++
pkg/logger/settings.go | 4 +-
.../task/network/analyze/base/connection.go | 3 +-
pkg/profiling/task/network/analyze/base/context.go | 14 +-
pkg/profiling/task/network/analyze/events/data.go | 5 +
.../task/network/analyze/layer4/listener.go | 5 +-
.../task/network/analyze/layer7/events.go | 2 +-
.../analyze/layer7/protocols/base/analyzer.go | 3 +-
.../analyze/layer7/protocols/base/protocol.go | 2 +-
.../analyze/layer7/protocols/http1/analyzer.go | 19 +-
.../layer7/protocols/http1/reader/request.go | 7 +
pkg/tools/btf/ebpf.go | 30 ---
pkg/tools/btf/linker.go | 8 +
pkg/tools/btf/queue.go | 81 +++----
pkg/tools/buffer/buffer.go | 31 +++
pkg/tools/ip/conntrack.go | 20 +-
scripts/build/bash/btfgen.sh | 1 +
31 files changed, 772 insertions(+), 434 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a27176d..8cb3d99 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,7 +16,10 @@ Release Notes.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
* Reduce missing details issue in the access log module.
-* Introduce ringbuf queue to improve performance in the access log module.
+* Improve HTTP/1.x protocol parsing strategy to encase missing data.
+* Add gRPC sender to sending the access log to the backend.
+* Add warning log when the event queue almost full in the access log module.
+* Reduce unessential `conntrack` query when detect new connection.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/common/connection.h
b/bpf/accesslog/common/connection.h
index cb799e8..880826d 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -22,6 +22,7 @@
#include "data_args.h"
#include "socket_opts.h"
#include "queue.h"
+#include "socket_data.h"
// syscall:connect
struct connect_args_t {
@@ -107,7 +108,7 @@ struct socket_connect_event_t {
__u64 conntrack_upstream_iph;
__u32 conntrack_upstream_port;
};
-DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
+DATA_QUEUE(socket_connection_event_queue);
// active connection cached into the hashmap
// if connection closed, then deleted
@@ -159,7 +160,7 @@ struct socket_close_event_t {
// close success
__u32 success;
};
-DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
+DATA_QUEUE(socket_close_event_queue);
static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ?
false : true;
@@ -303,6 +304,7 @@ static __inline void submit_connection_when_not_exists(void
*ctx, __u64 id, stru
}
static __inline void notify_close_connection(void* ctx, __u64 conid, struct
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
+ bpf_map_delete_elem(&socket_data_last_id_map, &conid);
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue,
sizeof(*close_event));
if (close_event == NULL) {
diff --git a/bpf/accesslog/syscalls/transfer.h
b/bpf/accesslog/syscalls/transfer.h
index 053ce01..d189ad2 100644
--- a/bpf/accesslog/syscalls/transfer.h
+++ b/bpf/accesslog/syscalls/transfer.h
@@ -69,7 +69,7 @@ struct socket_detail_t {
__u8 ssl;
};
-DATA_QUEUE(socket_detail_queue, 1024 * 1024);
+DATA_QUEUE(socket_detail_queue);
static __always_inline void process_write_data(void *ctx, __u64 id, struct
sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs,
__u8 func_name, bool ssl) {
diff --git a/bpf/include/queue.h b/bpf/include/queue.h
index c6b532c..559aeb4 100644
--- a/bpf/include/queue.h
+++ b/bpf/include/queue.h
@@ -19,12 +19,10 @@
#include "api.h"
-#define DATA_QUEUE(name, size) \
- struct { \
- __uint(type, BPF_MAP_TYPE_RINGBUF); \
- __uint(max_entries, size); \
- } name SEC(".maps"); \
- const void *rover_data_queue_##name __attribute__((unused));
+#define DATA_QUEUE(name) \
+ struct { \
+ __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);\
+ } name SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
@@ -36,26 +34,12 @@ struct {
static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
static const int zero = 0;
- if (bpf_core_enum_value_exists(enum bpf_func_id,
- BPF_FUNC_ringbuf_reserve))
- return bpf_ringbuf_reserve(map, size, 0);
-
return bpf_map_lookup_elem(&rover_data_heap, &zero);
}
-static __always_inline void rover_discard_buf(void *buf)
-{
- if (bpf_core_enum_value_exists(enum bpf_func_id,
- BPF_FUNC_ringbuf_discard))
- bpf_ringbuf_discard(buf, 0);
+static __always_inline void rover_discard_buf(void *buf) {
}
static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf,
__u64 size) {
- if (bpf_core_enum_value_exists(enum bpf_func_id,
- BPF_FUNC_ringbuf_submit)) {
- bpf_ringbuf_submit(buf, 0);
- return 0;
- }
-
return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
}
\ No newline at end of file
diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h
index 0d62ccb..248b34c 100644
--- a/bpf/include/socket_data.h
+++ b/bpf/include/socket_data.h
@@ -35,6 +35,7 @@ struct socket_data_upload_event {
__u64 conid;
__u64 randomid;
__u64 data_id;
+ __u64 prev_data_id;
__u64 total_size;
char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH];
};
@@ -44,7 +45,7 @@ struct {
__type(value, struct socket_data_upload_event);
__uint(max_entries, 1);
} socket_data_upload_event_per_cpu_map SEC(".maps");
-DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
+DATA_QUEUE(socket_data_upload_queue);
struct socket_data_sequence_t {
__u64 data_id;
@@ -81,6 +82,7 @@ struct upload_data_args {
__u64 random_id;
__u64 socket_data_id;
+ __u64 prev_socket_data_id;
struct iovec *socket_data_iovec;
size_t socket_data_iovlen;
ssize_t bytes_count;
@@ -129,11 +131,13 @@ static __always_inline void
__upload_socket_data_with_buffer(void *ctx, __u8 ind
socket_data_event->randomid = args->random_id;
socket_data_event->total_size = args->bytes_count;
socket_data_event->data_id = args->socket_data_id;
+ socket_data_event->prev_data_id = args->prev_socket_data_id;
socket_data_event->sequence = index;
socket_data_event->data_len = size;
socket_data_event->finished = is_finished;
socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
+ asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :);
bpf_probe_read(&socket_data_event->buffer, size, buf);
rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event,
sizeof(*socket_data_event));
}
@@ -208,15 +212,38 @@ static __always_inline void upload_socket_data_iov(void
*ctx, struct iovec* iov,
UPLOAD_PER_SOCKET_DATA_IOV();
}
+struct socket_data_last_id_t {
+ __u64 random_id;
+ __u64 socket_data_id;
+};
+struct {
+ __uint(type, BPF_MAP_TYPE_LRU_HASH);
+ __uint(max_entries, 10000);
+ __type(key, __u64);
+ __type(value, struct socket_data_last_id_t);
+} socket_data_last_id_map SEC(".maps");
+
static __inline void upload_socket_data(void *ctx, struct upload_data_args
*args) {
// must have protocol and ssl must same(plain)
// if the connection data is needs to skip upload, then skip
if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN ||
args->connection_ssl != args->socket_data_ssl ||
args->connection_skip_data_upload == 1) {
return;
}
+ struct socket_data_last_id_t *latest =
bpf_map_lookup_elem(&socket_data_last_id_map, &args->con_id);
+ args->prev_socket_data_id = 0;
+ if (latest != NULL && latest->random_id == args->random_id) {
+ args->prev_socket_data_id = latest->socket_data_id;
+ }
if (args->socket_data_buf != NULL) {
upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count,
args, args->socket_ssl_buffer_force_unfinished);
} else if (args->socket_data_iovec != NULL) {
upload_socket_data_iov(ctx, args->socket_data_iovec,
args->socket_data_iovlen, args->bytes_count, args);
}
+
+ if (latest == NULL || latest->socket_data_id != args->socket_data_id) {
+ struct socket_data_last_id_t data = {};
+ data.random_id = args->random_id;
+ data.socket_data_id = args->socket_data_id;
+ bpf_map_update_elem(&socket_data_last_id_map, &args->con_id, &data,
BPF_ANY);
+ }
}
\ No newline at end of file
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index a5565a1..d861447 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker,
%v", err)
}
- c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
+ c.eventQueue = btf.NewEventQueue("connection resolver",
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int)
btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 9c17160..227422f 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -90,9 +90,6 @@ func (p *HTTP1Protocol) Analyze(connection
*PartitionConnection, _ *AnalyzeHelpe
}
messageType, err := p.reader.IdentityMessageType(buf)
- log.Debugf("ready to reading message type, messageType: %v,
buf: %p, data id: %d, "+
- "connection ID: %d, random ID: %d, error: %v",
messageType, buf, buf.Position().DataID(),
- metrics.ConnectionID, metrics.RandomID, err)
if err != nil {
http1Log.Debugf("failed to identity message type, %v",
err)
if buf.SkipCurrentElement() {
@@ -115,6 +112,9 @@ func (p *HTTP1Protocol) Analyze(connection
*PartitionConnection, _ *AnalyzeHelpe
metrics.ConnectionID, metrics.RandomID,
buf.Position().DataID(), err)
}
+ http1Log.Debugf("readed message, messageType: %v, buf: %p, data
id: %d, "+
+ "connection ID: %d, random ID: %d, metrics : %p, handle
result: %d",
+ messageType, buf, buf.Position().DataID(),
metrics.ConnectionID, metrics.RandomID, metrics, result)
finishReading := false
switch result {
case enums.ParseResultSuccess:
@@ -149,13 +149,13 @@ func (p *HTTP1Protocol) handleRequest(metrics
*HTTP1Metrics, buf *buffer.Buffer)
}
func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b
*buffer.Buffer) (enums.ParseResult, error) {
- firstRequest := metrics.halfRequests.Front()
- if firstRequest == nil {
- log.Debugf("cannot found request for response, skip response,
connection ID: %d, random ID: %d",
- metrics.ConnectionID, metrics.RandomID)
+ request := metrics.findMatchesRequest(b.Position().DataID(),
b.Position().PrevDataID())
+ if request == nil {
+ log.Debugf("cannot found request for response, skip response,
connection ID: %d, random ID: %d, "+
+ "required prev data id: %d, current data id: %d",
+ metrics.ConnectionID, metrics.RandomID,
b.Position().PrevDataID(), b.Position().DataID())
return enums.ParseResultSkipPackage, nil
}
- request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
// parsing response
response, result, err := p.reader.ReadResponse(request, b, true)
@@ -286,3 +286,16 @@ func (m *HTTP1Metrics) appendRequestToList(req
*reader.Request) {
m.halfRequests.PushBack(req)
}
}
+
+func (m *HTTP1Metrics) findMatchesRequest(currentDataID, prevDataID uint64)
*reader.Request {
+ for element := m.halfRequests.Front(); element != nil; element =
element.Next() {
+ req := element.Value.(*reader.Request)
+ // if the tail data id of request is equals to the prev data id
of response
+ // or tail request data id+1==first response data id, then
return the request
+ if uint64(req.MaxDataID()) == prevDataID ||
uint64(req.MaxDataID()+1) == currentDataID {
+ m.halfRequests.Remove(element)
+ return req
+ }
+ }
+ return nil
+}
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 7e07eb3..c9789b7 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -88,7 +88,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
}
func (q *AnalyzeQueue) Start(ctx context.Context) {
- q.eventQueue =
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels,
q.context.Config.ProtocolAnalyze.QueueSize,
+ q.eventQueue = btf.NewEventQueue("socket data analyzer",
+ q.context.Config.ProtocolAnalyze.AnalyzeParallels,
q.context.Config.ProtocolAnalyze.QueueSize,
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
@@ -128,12 +129,13 @@ type PartitionContext struct {
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID
uint64,
protocol enums.ConnectionProtocol, currentDataID uint64)
*PartitionConnection {
connection := &PartitionConnection{
- connectionID: conID,
- randomID: randomID,
- dataBuffers:
make(map[enums.ConnectionProtocol]*buffer.Buffer),
- protocol: make(map[enums.ConnectionProtocol]uint64),
- protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
- protocolMetrics:
make(map[enums.ConnectionProtocol]ProtocolMetrics),
+ connectionID: conID,
+ randomID: randomID,
+ dataBuffers:
make(map[enums.ConnectionProtocol]*buffer.Buffer),
+ protocol: make(map[enums.ConnectionProtocol]uint64),
+ protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
+ protocolMetrics:
make(map[enums.ConnectionProtocol]ProtocolMetrics),
+ lastCheckCloseTime: time.Now(),
}
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol,
currentDataID)
return connection
@@ -173,7 +175,6 @@ func (p *PartitionContext) OnConnectionClose(event
*events.SocketCloseEvent, clo
}
connection := conn.(*PartitionConnection)
connection.closeCallback = closeCallback
- connection.closed = true
log.Debugf("receive the connection close event and mark is closable,
connection ID: %d, random ID: %d, partition number: %d",
event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
}
@@ -227,8 +228,9 @@ func (p *PartitionContext) Consume(data interface{}) {
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
- log.Debugf("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
- event.ConnectionID, event.RandomID, pid, event.DataID0,
event.Sequence0, event.Protocol0)
+ log.Debugf("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, prev data id: %d, "+
+ "data id: %d, sequence: %d, protocol: %d",
+ event.ConnectionID, event.RandomID, pid,
event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
diff --git a/pkg/accesslog/collector/tls.go b/pkg/accesslog/collector/tls.go
index 41c8811..0d14a58 100644
--- a/pkg/accesslog/collector/tls.go
+++ b/pkg/accesslog/collector/tls.go
@@ -18,6 +18,8 @@
package collector
import (
+ "sync"
+
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
@@ -33,6 +35,7 @@ type TLSCollector struct {
context *common.AccessLogContext
monitoredProcesses map[int32]bool
linker *btf.Linker
+ mutex sync.Mutex
}
func NewTLSCollector() *TLSCollector {
@@ -57,6 +60,14 @@ func (c *TLSCollector) Stop() {
}
func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
+ go func() {
+ c.addProcess(pid)
+ }()
+}
+
+func (c *TLSCollector) addProcess(pid int32) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
if _, ok := c.monitoredProcesses[pid]; ok {
return
}
@@ -83,5 +94,7 @@ func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
}
func (c *TLSCollector) OnProcessRemoved(pid int32) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
delete(c.monitoredProcesses, pid)
}
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 7c0e992..afac079 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -57,6 +57,9 @@ const (
// in case the reading the data from BPF queue is disordered, so add a
delay time to delete the connection information
connectionDeleteDelayTime = time.Second * 20
+
+ // the connection check exist time
+ connectionCheckExistTime = time.Second * 30
)
type addressProcessType int
@@ -144,13 +147,14 @@ type addressInfo struct {
}
type ConnectionInfo struct {
- ConnectionID uint64
- RandomID uint64
- RPCConnection *v3.AccessLogConnection
- MarkDeletable bool
- PID uint32
- Socket *ip.SocketPair
- DeleteAfter *time.Time
+ ConnectionID uint64
+ RandomID uint64
+ RPCConnection *v3.AccessLogConnection
+ MarkDeletable bool
+ PID uint32
+ Socket *ip.SocketPair
+ LastCheckExistTime time.Time
+ DeleteAfter *time.Time
}
func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
@@ -192,8 +196,10 @@ func (c *ConnectionManager) Start(ctx context.Context,
accessLogContext *AccessL
// if the connection is not existed,
then delete it
if err :=
c.activeConnectionMap.Delete(conID); err != nil {
- log.Warnf("failed to delete the
active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error:
%v",
- pid, fd, conID,
activateConn.RandomID, err)
+ if !errors.Is(err,
ebpf.ErrKeyNotExist) {
+ log.Warnf("failed to
delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID:
%d, error: %v",
+ pid, fd, conID,
activateConn.RandomID, err)
+ }
continue
}
log.Debugf("deleted the active
connection as not exist in file system, pid: %d, fd: %d, connection ID: %d,
random ID: %d",
@@ -318,17 +324,20 @@ func (c *ConnectionManager)
connectionPostHandle(connection *ConnectionInfo, eve
c.allUnfinishedConnections[fmt.Sprintf("%d_%d",
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
}
case events.SocketDetail:
+ tlsMode := connection.RPCConnection.TlsMode
+ protocol := connection.RPCConnection.Protocol
if e.GetSSL() == 1 && connection.RPCConnection.TlsMode ==
v3.AccessLogConnectionTLSMode_Plain {
- connection.RPCConnection.TlsMode =
v3.AccessLogConnectionTLSMode_TLS
+ tlsMode = v3.AccessLogConnectionTLSMode_TLS
}
if e.GetProtocol() != enums.ConnectionProtocolUnknown &&
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
switch e.GetProtocol() {
case enums.ConnectionProtocolHTTP:
- connection.RPCConnection.Protocol =
v3.AccessLogProtocolType_HTTP_1
+ protocol = v3.AccessLogProtocolType_HTTP_1
case enums.ConnectionProtocolHTTP2:
- connection.RPCConnection.Protocol =
v3.AccessLogProtocolType_HTTP_2
+ protocol = v3.AccessLogProtocolType_HTTP_2
}
}
+ c.rebuildRPCConnectionWithTLSModeAndProtocol(connection,
tlsMode, protocol)
}
// notify all flush listeners the connection is ready to flush
@@ -337,6 +346,22 @@ func (c *ConnectionManager)
connectionPostHandle(connection *ConnectionInfo, eve
}
}
+// According to https://github.com/golang/protobuf/issues/1609
+// if the message is modified during marshaling, it may cause the error when
send the message to the backend
+// so, we need to clone the message and change it before sending it to the
channel
+func (c *ConnectionManager)
rebuildRPCConnectionWithTLSModeAndProtocol(connection *ConnectionInfo,
+ tls v3.AccessLogConnectionTLSMode, protocol v3.AccessLogProtocolType) {
+ original := connection.RPCConnection
+ connection.RPCConnection = &v3.AccessLogConnection{
+ Local: original.Local,
+ Remote: original.Remote,
+ Role: original.Role,
+ TlsMode: tls,
+ Protocol: protocol,
+ Attachment: original.Attachment,
+ }
+}
+
func (c *ConnectionManager) ProcessIsMonitor(pid uint32) bool {
c.monitoringProcessLock.RLock()
defer c.monitoringProcessLock.RUnlock()
@@ -360,11 +385,12 @@ func (c *ConnectionManager) buildConnection(event
*events.SocketConnectEvent, so
Protocol: v3.AccessLogProtocolType_TCP,
}
return &ConnectionInfo{
- ConnectionID: event.ConID,
- RandomID: event.RandomID,
- RPCConnection: connection,
- PID: event.PID,
- Socket: socket,
+ ConnectionID: event.ConID,
+ RandomID: event.RandomID,
+ RPCConnection: connection,
+ PID: event.PID,
+ Socket: socket,
+ LastCheckExistTime: time.Now(),
}
}
@@ -503,6 +529,15 @@ func (c *ConnectionManager) AddNewProcess(pid int32,
entities []api.ProcessInter
defer c.monitoringProcessLock.Unlock()
// adding monitoring process and IP addresses
+ var entity *api.ProcessEntity
+ if len(entities) > 0 {
+ entity = entities[0].Entity()
+ }
+ log.Infof("adding monitoring process, pid: %d, entity: %v", pid, entity)
+ if _, ok := c.monitoringProcesses[pid]; ok {
+ log.Infof("the process %d already monitoring, so no needs to
add again", pid)
+ return
+ }
c.monitoringProcesses[pid] = monitorProcesses
c.updateMonitorStatusForProcess(pid, true)
for _, entity := range monitorProcesses {
@@ -545,6 +580,29 @@ func (c *ConnectionManager)
shouldMonitorProcesses(entities []api.ProcessInterfa
return c.monitorFilter.ShouldIncludeProcesses(entities)
}
+func (c *ConnectionManager) checkConnectionIsExist(con *ConnectionInfo) bool {
+ // skip the check if the check time is not reach
+ if time.Since(con.LastCheckExistTime) < connectionCheckExistTime {
+ return true
+ }
+ con.LastCheckExistTime = time.Now()
+ var activateConn ActiveConnection
+ if err := c.activeConnectionMap.Lookup(con.ConnectionID,
&activateConn); err != nil {
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ con.MarkDeletable = true
+ return false
+ }
+ log.Warnf("cannot found the active connection: %d-%d, err: %v",
con.ConnectionID, con.RandomID, err)
+ return false
+ } else if activateConn.RandomID != 0 && activateConn.RandomID !=
con.RandomID {
+ log.Debugf("detect the connection: %d-%d is already closed(by
difference random ID), so remove from the connection manager",
+ con.ConnectionID, con.RandomID)
+ con.MarkDeletable = true
+ return false
+ }
+ return true
+}
+
func (c *ConnectionManager) RemoveProcess(pid int32, entities
[]api.ProcessInterface) {
c.monitoringProcessLock.Lock()
defer c.monitoringProcessLock.Unlock()
@@ -623,7 +681,7 @@ func (c *ConnectionManager)
updateMonitorStatusForProcess(pid int32, monitor boo
func (c *ConnectionManager) OnBuildConnectionLogFinished() {
// delete all connections which marked as deletable
// all deletable connection events been sent
- deletableConnections := make(map[string]bool, 0)
+ deletableConnections := make(map[string]bool)
now := time.Now()
c.connections.IterCb(func(key string, v interface{}) {
con, ok := v.(*ConnectionInfo)
@@ -632,6 +690,9 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
}
// already mark as deletable or process not monitoring
shouldDelete := con.MarkDeletable ||
!c.ProcessIsMonitor(con.PID)
+ if !shouldDelete {
+ shouldDelete = !c.checkConnectionIsExist(con)
+ }
if shouldDelete && con.DeleteAfter == nil {
deleteAfterTime := now.Add(connectionDeleteDelayTime)
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 83e4ba4..6ecfd2f 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -36,6 +36,7 @@ type SocketDataUploadEvent struct {
ConnectionID uint64
RandomID uint64
DataID0 uint64
+ PrevDataID0 uint64
TotalSize0 uint64
Buffer [2048]byte
}
@@ -52,6 +53,7 @@ func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
s.ConnectionID = r.ReadUint64()
s.RandomID = r.ReadUint64()
s.DataID0 = r.ReadUint64()
+ s.PrevDataID0 = r.ReadUint64()
s.TotalSize0 = r.ReadUint64()
r.ReadUint8Array(s.Buffer[:], 2048)
}
@@ -96,6 +98,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
return s.DataID0
}
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+ return s.PrevDataID0
+}
+
func (s *SocketDataUploadEvent) DataSequence() int {
return int(s.Sequence0)
}
diff --git a/pkg/accesslog/events/events_test.go
b/pkg/accesslog/events/events_test.go
index 7a4fc51..1da9f3e 100644
--- a/pkg/accesslog/events/events_test.go
+++ b/pkg/accesslog/events/events_test.go
@@ -78,120 +78,117 @@ b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00
03 00 02 01 00 00 2a 06 9c 5c fc 7b 5a 30 02 00
20 c5 fd 7b 5a 30 02 00 04 00 00 00 7a a1 00 00
4e 56 83 76 00 00 00 00 02 00 00 00 00 00 00 00
-2a 06 00 00 00 00 00 00 16 03 03 00 7a 02 00 00
-76 03 03 b1 01 85 b9 ce d4 95 9d 1e 90 f5 90 c3
-f5 99 70 64 ab 0f 48 a0 fa 0d 8d 69 a3 e6 6d a5
-29 98 d7 20 6f a3 13 ea 3c e1 d7 20 28 1f df 94
-90 27 e4 35 33 55 de 90 64 1c 3e 04 6f 8a ce c3
-71 45 c0 5e 13 01 00 00 2e 00 2b 00 02 03 04 00
-33 00 24 00 1d 00 20 00 f1 14 c3 d8 1c 40 01 8e
-9a d7 06 93 be 1d c0 a8 dd 42 53 96 19 51 ce aa
-a6 80 4c d1 cc f1 15 14 03 03 00 01 01 17 03 03
-00 26 96 c9 a2 ec 5b d9 1b 81 38 a9 a1 b3 4e e3
-a7 ee c2 e3 00 21 44 6a cb 05 d1 cc 3f 59 c7 67
-29 f9 eb a9 b3 88 8b 43 17 03 03 03 8d cc f4 f1
-0c 23 44 96 ea 11 a3 f0 1d 2f 71 e2 29 ec 1f c2
-88 8a 04 a8 59 13 fe fd 45 24 aa 0b 64 b0 67 08
-41 17 39 f6 c7 ad 5d 0a 94 70 d9 89 74 6a 24 5a
-91 7a 8d 92 a7 66 1c f9 10 17 52 0e df 9e 4c 24
-b4 23 1c bb 0c 78 a6 bb 8a 97 46 27 45 ae 3e 01
-83 12 ac 8a 75 12 68 f3 91 37 7e ae a8 41 61 82
-e6 48 a8 65 08 1b ae f5 28 92 b9 3c f8 47 93 77
-a9 f1 f9 a6 ec 67 6f 3d f9 00 df c7 da 43 27 c1
-8a 61 ac 28 6a aa 0d ce 99 25 c3 9e ab ee a7 ff
-d8 0a e8 65 bd a0 4d a9 e8 0f 39 e6 b3 2f 95 ef
-83 dd 31 31 d8 49 df 1a 5d 5c 51 76 7f bd 4b a3
-5d 08 da 25 c8 06 38 ba b4 d2 21 24 33 e1 b2 48
-8b af 2d bd 7b 32 9b 7c e6 b2 72 ba f2 fe 60 62
-db d9 b0 0a aa 34 3d f7 46 3a b2 d0 0a 84 fc 02
-a7 a9 d8 ca db 89 3b f4 a1 f1 de e5 5d 29 73 02
-2b 1b 8d 8d b7 06 1f a2 8e dd d4 6c 1d b8 1a 57
-8b 09 17 96 6f 00 62 75 c3 be 42 68 2a 29 1a 70
-f8 03 5b ae 69 cb 89 8a c1 00 34 d0 90 cf 69 5b
-00 62 26 4a 74 d3 6d 0e 84 38 1a 29 da 7e d9 57
-44 22 75 f1 23 e8 8e e4 cb 80 ec 06 f7 c4 63 cb
-0b ec a5 02 32 50 d9 92 40 f5 89 a7 18 10 79 c1
-fc d3 52 aa 15 8d 28 14 53 32 5c 46 db 4f 00 19
-5e 50 8c 17 e8 0e 36 71 1a 94 53 3c 03 42 0a 05
-8c 7d 7f 4e d3 a1 0b 90 aa a3 f7 9a a5 f9 a2 7f
-36 4d 46 95 df 89 91 ef 01 ec 44 2c d1 79 b7 e8
-3f 1e 56 8e bb b6 fc c1 19 81 78 85 87 88 c4 f1
-64 69 df aa 33 f0 a9 1f aa 54 82 16 1f 4b 99 2b
-18 38 9c bf 26 98 a7 12 f0 a2 04 de ef 98 63 da
-49 ab f7 38 6d 0b 89 45 ee db c0 1e af bf d3 3a
-27 6c 91 7a 9b d0 35 45 e7 65 c5 43 3d 70 68 03
-02 8d 68 c7 3f fe 1d 2b a4 0e 74 28 e9 82 21 9a
-cb b1 b4 9e 91 01 53 89 51 d2 3d 37 b3 16 1c 3e
-d8 5f 84 04 95 3a fc f5 9a b7 00 4c ba 10 72 31
-2f 6d 17 bd b8 9f 48 e5 3e 14 60 61 4c 33 86 a1
-bd 99 34 15 aa 61 39 89 97 91 3f dc 11 f7 25 d0
-5d 80 5e c5 dc 2c 03 d7 ab 2d 90 93 3c e5 f5 3e
-2c 16 76 48 0b 94 b5 00 5e 8f 97 cf 10 2d 46 d3
-50 18 c2 8f 58 ac bd cf 6e 4e 2f 6d cc 71 4f 00
-1d 33 4f 3c 57 06 d6 48 8e 50 a9 e3 19 1d b6 13
-1c 6a 1d 43 88 4d 57 5d e2 be 79 6b 2b 86 0b 52
-01 31 67 1a 59 a0 7f 0c be c4 cb 5e 7e 5e bb 39
-45 2c 68 10 7c 51 39 a0 ed 83 1e 35 1b c4 63 8b
-b5 e2 7b 8a 9d a7 ac 02 a3 fc cc ec c2 db c0 59
-7b db 4b 27 b8 52 38 12 4d 05 38 bd 2b bd 73 c8
-a1 33 c4 da 69 6a ce 32 f4 62 51 c6 87 c2 d8 f8
-45 5b df 9c 18 5a 91 2e c7 f9 44 87 69 0d 44 70
-04 23 f7 da b7 1e 8a 81 c5 28 15 bc b4 83 fb c2
-ef b8 95 b7 37 aa 2d 85 22 8c b8 26 28 7d c7 83
-d5 fe 30 bf 9c a9 44 d1 d4 37 34 5b a4 ff 63 fc
-e6 31 d0 11 6c 4a bd 1d 7a 70 80 25 54 70 d1 44
-45 74 ed b6 50 a5 4e 59 f8 c2 f5 99 3d f9 26 43
-cd 21 7e 72 60 ef 53 03 f3 6e e7 8e 86 68 5f f0
-cc b3 09 64 56 f6 f5 37 53 06 fe ec 3c e7 79 a5
-82 7f e0 d0 5f e3 77 0b 18 4a 03 1e 63 a1 53 64
-df 87 57 40 f5 c2 56 bb 73 cb ce 68 d2 da 6c 0f
-4e 57 06 8d 95 5f a9 6d f0 70 d8 bb 83 85 80 56
-52 a5 3f bc 4a 21 45 89 d4 0a 17 03 03 01 19 4f
-b6 ea 71 c8 69 7d fa 10 21 08 8b 93 b2 d2 06 5b
-2b b5 60 e7 cf 0d 85 ad 3d c4 53 e5 b6 7c d2 35
-e6 97 23 95 fb 61 15 57 3c 4a 67 ec 61 26 4d 58
-ee 08 af 47 f7 90 b3 11 ba 41 6a be 79 db cf 88
-1f d5 04 89 c9 b0 f0 bc 85 30 87 82 88 ee 77 8d
-f9 ff 9d 77 f6 50 03 93 88 ea 62 14 cf 47 d4 ad
-f7 c4 e1 be 46 7b c0 fa ab b1 76 39 50 76 55 e9
-8c c6 c8 a8 13 fa a3 2e 9c 4f 32 7f 9c a4 dc f3
-1d e8 fe 3c be af 6d 21 e4 e0 e4 53 b1 cb 3f 63
-ac d9 d2 17 81 fa 33 88 8d 61 82 40 5f 56 0f 91
-a0 d7 a6 33 fd 59 09 f4 95 99 f6 57 dd d5 32 44
-6f a0 64 2e 74 0a 54 90 65 c2 93 61 18 b4 b0 5e
-15 27 fa 4d 53 e6 1d aa 1b 13 a6 00 d0 b6 98 07
-9a b5 91 03 2f 55 40 69 c0 69 4e 48 33 f1 03 15
-cc f8 d2 0a ad 74 6a 37 5a 1b a8 bb fa 3f 04 8c
-a8 b5 23 a0 50 2b 8f a5 fb 1d e4 1b 2f 11 bf e1
-4c 5a 7b 72 4f f4 d5 65 23 e8 26 22 47 ad 8a e0
-eb 0e b3 ee db 54 7c 23 17 03 03 00 35 8e 6c 95
-11 a0 76 73 22 67 3a 72 b6 02 30 fe 55 94 60 bb
-33 4a c4 fd 7f 6b 00 2c 10 37 4e 29 e8 f7 39 f9
-04 9d 92 97 93 12 ec d7 fe 9c fb 78 95 a2 c1 2d
-74 d2 17 03 03 00 8b df 38 dc a2 d9 44 06 ce 79
-5a 6a e8 9f 97 83 e1 80 c2 84 3b 18 7f 16 d3 9e
-fb 53 c9 03 b1 2b 66 fe 81 06 a8 89 4d a0 e3 64
-f7 39 53 b6 9c d4 4a 38 bc e4 db c8 d7 68 5e f1
-d7 6a 0c 49 4c 5c 28 f6 09 76 8e 15 0b 42 f6 1c
-17 07 05 81 8a 05 23 50 cd b0 a6 a3 89 c9 ac 5d
-35 35 33 15 4a 6f 31 80 a0 ea de 8e 56 e5 16 e5
-d7 f0 e3 f9 09 35 c2 be 9d 74 48 19 39 b8 c9 04
-70 9a 58 22 05 fc 68 78 52 b4 92 ab d2 14 66 97
-45 e6 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+02 00 00 00 00 00 00 00 2a 06 00 00 00 00 00 00
+16 03 03 00 7a 02 00 00 76 03 03 b1 01 85 b9 ce
+d4 95 9d 1e 90 f5 90 c3 f5 99 70 64 ab 0f 48 a0
+fa 0d 8d 69 a3 e6 6d a5 29 98 d7 20 6f a3 13 ea
+3c e1 d7 20 28 1f df 94 90 27 e4 35 33 55 de 90
+64 1c 3e 04 6f 8a ce c3 71 45 c0 5e 13 01 00 00
+2e 00 2b 00 02 03 04 00 33 00 24 00 1d 00 20 00
+f1 14 c3 d8 1c 40 01 8e 9a d7 06 93 be 1d c0 a8
+dd 42 53 96 19 51 ce aa a6 80 4c d1 cc f1 15 14
+03 03 00 01 01 17 03 03 00 26 96 c9 a2 ec 5b d9
+1b 81 38 a9 a1 b3 4e e3 a7 ee c2 e3 00 21 44 6a
+cb 05 d1 cc 3f 59 c7 67 29 f9 eb a9 b3 88 8b 43
+17 03 03 03 8d cc f4 f1 0c 23 44 96 ea 11 a3 f0
+1d 2f 71 e2 29 ec 1f c2 88 8a 04 a8 59 13 fe fd
+45 24 aa 0b 64 b0 67 08 41 17 39 f6 c7 ad 5d 0a
+94 70 d9 89 74 6a 24 5a 91 7a 8d 92 a7 66 1c f9
+10 17 52 0e df 9e 4c 24 b4 23 1c bb 0c 78 a6 bb
+8a 97 46 27 45 ae 3e 01 83 12 ac 8a 75 12 68 f3
+91 37 7e ae a8 41 61 82 e6 48 a8 65 08 1b ae f5
+28 92 b9 3c f8 47 93 77 a9 f1 f9 a6 ec 67 6f 3d
+f9 00 df c7 da 43 27 c1 8a 61 ac 28 6a aa 0d ce
+99 25 c3 9e ab ee a7 ff d8 0a e8 65 bd a0 4d a9
+e8 0f 39 e6 b3 2f 95 ef 83 dd 31 31 d8 49 df 1a
+5d 5c 51 76 7f bd 4b a3 5d 08 da 25 c8 06 38 ba
+b4 d2 21 24 33 e1 b2 48 8b af 2d bd 7b 32 9b 7c
+e6 b2 72 ba f2 fe 60 62 db d9 b0 0a aa 34 3d f7
+46 3a b2 d0 0a 84 fc 02 a7 a9 d8 ca db 89 3b f4
+a1 f1 de e5 5d 29 73 02 2b 1b 8d 8d b7 06 1f a2
+8e dd d4 6c 1d b8 1a 57 8b 09 17 96 6f 00 62 75
+c3 be 42 68 2a 29 1a 70 f8 03 5b ae 69 cb 89 8a
+c1 00 34 d0 90 cf 69 5b 00 62 26 4a 74 d3 6d 0e
+84 38 1a 29 da 7e d9 57 44 22 75 f1 23 e8 8e e4
+cb 80 ec 06 f7 c4 63 cb 0b ec a5 02 32 50 d9 92
+40 f5 89 a7 18 10 79 c1 fc d3 52 aa 15 8d 28 14
+53 32 5c 46 db 4f 00 19 5e 50 8c 17 e8 0e 36 71
+1a 94 53 3c 03 42 0a 05 8c 7d 7f 4e d3 a1 0b 90
+aa a3 f7 9a a5 f9 a2 7f 36 4d 46 95 df 89 91 ef
+01 ec 44 2c d1 79 b7 e8 3f 1e 56 8e bb b6 fc c1
+19 81 78 85 87 88 c4 f1 64 69 df aa 33 f0 a9 1f
+aa 54 82 16 1f 4b 99 2b 18 38 9c bf 26 98 a7 12
+f0 a2 04 de ef 98 63 da 49 ab f7 38 6d 0b 89 45
+ee db c0 1e af bf d3 3a 27 6c 91 7a 9b d0 35 45
+e7 65 c5 43 3d 70 68 03 02 8d 68 c7 3f fe 1d 2b
+a4 0e 74 28 e9 82 21 9a cb b1 b4 9e 91 01 53 89
+51 d2 3d 37 b3 16 1c 3e d8 5f 84 04 95 3a fc f5
+9a b7 00 4c ba 10 72 31 2f 6d 17 bd b8 9f 48 e5
+3e 14 60 61 4c 33 86 a1 bd 99 34 15 aa 61 39 89
+97 91 3f dc 11 f7 25 d0 5d 80 5e c5 dc 2c 03 d7
+ab 2d 90 93 3c e5 f5 3e 2c 16 76 48 0b 94 b5 00
+5e 8f 97 cf 10 2d 46 d3 50 18 c2 8f 58 ac bd cf
+6e 4e 2f 6d cc 71 4f 00 1d 33 4f 3c 57 06 d6 48
+8e 50 a9 e3 19 1d b6 13 1c 6a 1d 43 88 4d 57 5d
+e2 be 79 6b 2b 86 0b 52 01 31 67 1a 59 a0 7f 0c
+be c4 cb 5e 7e 5e bb 39 45 2c 68 10 7c 51 39 a0
+ed 83 1e 35 1b c4 63 8b b5 e2 7b 8a 9d a7 ac 02
+a3 fc cc ec c2 db c0 59 7b db 4b 27 b8 52 38 12
+4d 05 38 bd 2b bd 73 c8 a1 33 c4 da 69 6a ce 32
+f4 62 51 c6 87 c2 d8 f8 45 5b df 9c 18 5a 91 2e
+c7 f9 44 87 69 0d 44 70 04 23 f7 da b7 1e 8a 81
+c5 28 15 bc b4 83 fb c2 ef b8 95 b7 37 aa 2d 85
+22 8c b8 26 28 7d c7 83 d5 fe 30 bf 9c a9 44 d1
+d4 37 34 5b a4 ff 63 fc e6 31 d0 11 6c 4a bd 1d
+7a 70 80 25 54 70 d1 44 45 74 ed b6 50 a5 4e 59
+f8 c2 f5 99 3d f9 26 43 cd 21 7e 72 60 ef 53 03
+f3 6e e7 8e 86 68 5f f0 cc b3 09 64 56 f6 f5 37
+53 06 fe ec 3c e7 79 a5 82 7f e0 d0 5f e3 77 0b
+18 4a 03 1e 63 a1 53 64 df 87 57 40 f5 c2 56 bb
+73 cb ce 68 d2 da 6c 0f 4e 57 06 8d 95 5f a9 6d
+f0 70 d8 bb 83 85 80 56 52 a5 3f bc 4a 21 45 89
+d4 0a 17 03 03 01 19 4f b6 ea 71 c8 69 7d fa 10
+21 08 8b 93 b2 d2 06 5b 2b b5 60 e7 cf 0d 85 ad
+3d c4 53 e5 b6 7c d2 35 e6 97 23 95 fb 61 15 57
+3c 4a 67 ec 61 26 4d 58 ee 08 af 47 f7 90 b3 11
+ba 41 6a be 79 db cf 88 1f d5 04 89 c9 b0 f0 bc
+85 30 87 82 88 ee 77 8d f9 ff 9d 77 f6 50 03 93
+88 ea 62 14 cf 47 d4 ad f7 c4 e1 be 46 7b c0 fa
+ab b1 76 39 50 76 55 e9 8c c6 c8 a8 13 fa a3 2e
+9c 4f 32 7f 9c a4 dc f3 1d e8 fe 3c be af 6d 21
+e4 e0 e4 53 b1 cb 3f 63 ac d9 d2 17 81 fa 33 88
+8d 61 82 40 5f 56 0f 91 a0 d7 a6 33 fd 59 09 f4
+95 99 f6 57 dd d5 32 44 6f a0 64 2e 74 0a 54 90
+65 c2 93 61 18 b4 b0 5e 15 27 fa 4d 53 e6 1d aa
+1b 13 a6 00 d0 b6 98 07 9a b5 91 03 2f 55 40 69
+c0 69 4e 48 33 f1 03 15 cc f8 d2 0a ad 74 6a 37
+5a 1b a8 bb fa 3f 04 8c a8 b5 23 a0 50 2b 8f a5
+fb 1d e4 1b 2f 11 bf e1 4c 5a 7b 72 4f f4 d5 65
+23 e8 26 22 47 ad 8a e0 eb 0e b3 ee db 54 7c 23
+17 03 03 00 35 8e 6c 95 11 a0 76 73 22 67 3a 72
+b6 02 30 fe 55 94 60 bb 33 4a c4 fd 7f 6b 00 2c
+10 37 4e 29 e8 f7 39 f9 04 9d 92 97 93 12 ec d7
+fe 9c fb 78 95 a2 c1 2d 74 d2 17 03 03 00 8b df
+38 dc a2 d9 44 06 ce 79 5a 6a e8 9f 97 83 e1 80
+c2 84 3b 18 7f 16 d3 9e fb 53 c9 03 b1 2b 66 fe
+81 06 a8 89 4d a0 e3 64 f7 39 53 b6 9c d4 4a 38
+bc e4 db c8 d7 68 5e f1 d7 6a 0c 49 4c 5c 28 f6
+09 76 8e 15 0b 42 f6 1c 17 07 05 81 8a 05 23 50
+cd b0 a6 a3 89 c9 ac 5d 35 35 33 15 4a 6f 31 80
+a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
+9d 74 48 19 39 b8 c9 04 70 9a 58 22 05 fc 68 78
+52 b4 92 ab d2 14 66 97 45 e6 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
@@ -206,7 +203,11 @@ d7 f0 e3 f9 09 35 c2 be 9d 74 48 19 39 b8 c9 04
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
-00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+00 00 00 00
`,
create: func() reader.EventReader {
return &SocketDataUploadEvent{}
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index 18577de..d483172 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -32,14 +32,12 @@ import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
+ "github.com/apache/skywalking-rover/pkg/accesslog/sender"
"github.com/apache/skywalking-rover/pkg/core"
"github.com/apache/skywalking-rover/pkg/core/backend"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
- "github.com/apache/skywalking-rover/pkg/process"
- "github.com/apache/skywalking-rover/pkg/tools/host"
- v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
@@ -53,8 +51,8 @@ type Runner struct {
mgr *module.Manager
backendOp backend.Operator
cluster string
- alsClient v3.EBPFAccessLogServiceClient
ctx context.Context
+ sender *sender.GRPCSender
}
func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) {
@@ -70,17 +68,18 @@ func NewRunner(mgr *module.Manager, config *common.Config)
(*Runner, error) {
backendOP := coreModule.BackendOperator()
clusterName := coreModule.ClusterName()
monitorFilter :=
common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","),
strings.Split(config.ExcludeClusters, ","))
+ connectionMgr := common.NewConnectionManager(config, mgr, bpfLoader,
monitorFilter)
runner := &Runner{
context: &common.AccessLogContext{
BPF: bpfLoader,
Config: config,
- ConnectionMgr: common.NewConnectionManager(config, mgr,
bpfLoader, monitorFilter),
+ ConnectionMgr: connectionMgr,
},
collectors: collector.Collectors(),
mgr: mgr,
backendOp: backendOP,
cluster: clusterName,
- alsClient:
v3.NewEBPFAccessLogServiceClient(backendOP.GetConnection()),
+ sender: sender.NewGRPCSender(mgr, connectionMgr),
}
runner.context.Queue = common.NewQueue(config.Flush.MaxCountOneStream,
flushDuration, runner)
return runner, nil
@@ -91,6 +90,7 @@ func (r *Runner) Start(ctx context.Context) error {
r.context.RuntimeContext = ctx
r.context.Queue.Start(ctx)
r.context.ConnectionMgr.Start(ctx, r.context)
+ r.sender.Start(ctx)
for _, c := range r.collectors {
err := c.Start(r.mgr, r.context)
if err != nil {
@@ -111,26 +111,20 @@ func (r *Runner) Consume(kernels chan *common.KernelLog,
protocols chan *common.
return
}
- allLogs := r.buildConnectionLogs(kernels, protocols)
- log.Debugf("ready to send access log, connection count: %d",
len(allLogs))
- if len(allLogs) == 0 {
- return
- }
- if err := r.sendLogs(allLogs); err != nil {
- log.Warnf("send access log failure: %v", err)
- }
+ batch := r.sender.NewBatch()
+ r.buildConnectionLogs(batch, kernels, protocols)
+ log.Debugf("ready to send access log, connection count: %d",
batch.ConnectionCount())
+ r.sender.AddBatch(batch)
}
-func (r *Runner) buildConnectionLogs(kernels chan *common.KernelLog, protocols
chan *common.ProtocolLog) map[*common.ConnectionInfo]*connectionLogs {
- result := make(map[*common.ConnectionInfo]*connectionLogs)
- r.buildKernelLogs(kernels, result)
- r.buildProtocolLogs(protocols, result)
+func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan
*common.KernelLog, protocols chan *common.ProtocolLog) {
+ r.buildKernelLogs(kernels, batch)
+ r.buildProtocolLogs(protocols, batch)
r.context.ConnectionMgr.OnBuildConnectionLogFinished()
- return result
}
-func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch
*sender.BatchLogs) {
delayAppends := make([]*common.KernelLog, 0)
for {
select {
@@ -139,13 +133,7 @@ func (r *Runner) buildKernelLogs(kernels chan
*common.KernelLog, result map[*com
log.Debugf("building kernel log result, connetaion ID:
%d, random ID: %d, exist connection: %t, delay: %t",
kernelLog.Event.GetConnectionID(),
kernelLog.Event.GetRandomID(), connection != nil, delay)
if connection != nil && curLog != nil {
- logs, exist := result[connection]
- if !exist {
- logs = newConnectionLogs()
- result[connection] = logs
- }
-
- logs.kernels = append(logs.kernels, curLog)
+ batch.AppendKernelLog(connection, curLog)
} else if delay {
delayAppends = append(delayAppends, kernelLog)
}
@@ -162,7 +150,7 @@ func (r *Runner) buildKernelLogs(kernels chan
*common.KernelLog, result map[*com
}
}
-func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch
*sender.BatchLogs) {
delayAppends := make([]*common.ProtocolLog, 0)
for {
select {
@@ -178,15 +166,7 @@ func (r *Runner) buildProtocolLogs(protocols chan
*common.ProtocolLog, result ma
conID, randomID, connection != nil,
delay)
}
if connection != nil && len(kernelLogs) > 0 &&
protocolLogs != nil {
- logs, exist := result[connection]
- if !exist {
- logs = newConnectionLogs()
- result[connection] = logs
- }
- logs.protocols = append(logs.protocols,
&connectionProtocolLog{
- kernels: kernelLogs,
- protocol: protocolLogs,
- })
+ batch.AppendProtocolLog(connection, kernelLogs,
protocolLogs)
} else if delay {
delayAppends = append(delayAppends, protocolLog)
}
@@ -203,95 +183,6 @@ func (r *Runner) buildProtocolLogs(protocols chan
*common.ProtocolLog, result ma
}
}
-func (r *Runner) sendLogs(allLogs map[*common.ConnectionInfo]*connectionLogs)
error {
- timeout, cancelFunc := context.WithTimeout(r.ctx, time.Second*20)
- defer cancelFunc()
- streaming, err := r.alsClient.Collect(timeout)
- if err != nil {
- return err
- }
-
- firstLog := true
- firstConnection := true
- for connection, logs := range allLogs {
- if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
- continue
- }
- if log.Enable(logrus.DebugLevel) {
- log.Debugf("ready to sending access log with
connection, connection ID: %d, random ID: %d, "+
- "local: %s, remote: %s, role: %s, kernel logs
count: %d, protocol log count: %d",
- connection.ConnectionID, connection.RandomID,
connection.RPCConnection.Local, connection.RPCConnection.Remote,
- connection.RPCConnection.Role,
len(logs.kernels), len(logs.protocols))
- }
-
- if len(logs.kernels) > 0 {
- r.sendLogToTheStream(streaming,
r.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels,
nil))
- firstLog, firstConnection = false, false
- }
- for _, protocolLog := range logs.protocols {
- r.sendLogToTheStream(streaming,
r.buildAccessLogMessage(firstLog, firstConnection, connection,
protocolLog.kernels, protocolLog.protocol))
- firstLog, firstConnection = false, false
- }
-
- firstConnection = true
- }
-
- if _, err := streaming.CloseAndRecv(); err != nil {
- log.Warnf("closing the access log streaming error: %v", err)
- }
- return nil
-}
-
-func (r *Runner) sendLogToTheStream(streaming
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) {
- if err := streaming.Send(logMsg); err != nil {
- log.Warnf("send access log failure: %v", err)
- }
-}
-
-func (r *Runner) buildAccessLogMessage(firstLog, firstConnection bool, conn
*common.ConnectionInfo,
- kernelLogs []*v3.AccessLogKernelLog, protocolLog
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
- var rpcCon *v3.AccessLogConnection
- if firstConnection {
- rpcCon = conn.RPCConnection
- }
- return &v3.EBPFAccessLogMessage{
- Node: r.BuildNodeInfo(firstLog),
- Connection: rpcCon,
- KernelLogs: kernelLogs,
- ProtocolLog: protocolLog,
- }
-}
-
-func (r *Runner) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
- if !needs {
- return nil
- }
- netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
- for i, n := range host.AllNetworkInterfaces() {
- netInterfaces = append(netInterfaces,
&v3.EBPFAccessLogNodeNetInterface{
- Index: int32(i),
- Mtu: int32(n.MTU),
- Name: n.Name,
- })
- }
- return &v3.EBPFAccessLogNodeInfo{
- Name:
r.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
- NetInterfaces: netInterfaces,
- BootTime: r.convertTimeToInstant(host.BootTime),
- ClusterName: r.cluster,
- Policy: &v3.EBPFAccessLogPolicy{
- ExcludeNamespaces:
r.context.ConnectionMgr.GetExcludeNamespaces(),
- },
- }
-}
-
-func (r *Runner) convertTimeToInstant(t time.Time) *v32.Instant {
- return &v32.Instant{
- Seconds: t.Unix(),
- Nanos: int32(t.Nanosecond()),
- }
-}
-
func (r *Runner) shouldReportProcessLog(pid uint32) bool {
// if the process not monitoring, then check the process is existed or
not
if r.context.ConnectionMgr.ProcessIsMonitor(pid) {
@@ -364,20 +255,3 @@ func (r *Runner) Stop() error {
r.context.ConnectionMgr.Stop()
return nil
}
-
-type connectionLogs struct {
- kernels []*v3.AccessLogKernelLog
- protocols []*connectionProtocolLog
-}
-
-type connectionProtocolLog struct {
- kernels []*v3.AccessLogKernelLog
- protocol *v3.AccessLogProtocolLogs
-}
-
-func newConnectionLogs() *connectionLogs {
- return &connectionLogs{
- kernels: make([]*v3.AccessLogKernelLog, 0),
- protocols: make([]*connectionProtocolLog, 0),
- }
-}
diff --git a/pkg/accesslog/sender/logs.go b/pkg/accesslog/sender/logs.go
new file mode 100644
index 0000000..95f2710
--- /dev/null
+++ b/pkg/accesslog/sender/logs.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+ "github.com/apache/skywalking-rover/pkg/accesslog/common"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var maxLogsPerSend = 10_000
+
+type BatchLogs struct {
+ logs map[*common.ConnectionInfo]*ConnectionLogs
+}
+
+func newBatchLogs() *BatchLogs {
+ return &BatchLogs{
+ logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+ }
+}
+
+func (l *BatchLogs) ConnectionCount() int {
+ return len(l.logs)
+}
+
+func (l *BatchLogs) AppendKernelLog(connection *common.ConnectionInfo, log
*v3.AccessLogKernelLog) {
+ logs, ok := l.logs[connection]
+ if !ok {
+ logs = newConnectionLogs()
+ l.logs[connection] = logs
+ }
+
+ logs.kernels = append(logs.kernels, log)
+}
+
+func (l *BatchLogs) AppendProtocolLog(connection *common.ConnectionInfo,
kernels []*v3.AccessLogKernelLog, protocols *v3.AccessLogProtocolLogs) {
+ logs, ok := l.logs[connection]
+ if !ok {
+ logs = newConnectionLogs()
+ l.logs[connection] = logs
+ }
+
+ logs.protocols = append(logs.protocols, &ConnectionProtocolLog{
+ kernels: kernels,
+ protocol: protocols,
+ })
+}
+
+func (l *BatchLogs) splitBatchLogs() []*BatchLogs {
+ logsCount := len(l.logs)
+ if logsCount == 0 {
+ return nil
+ }
+ splitCount := logsCount / maxLogsPerSend
+ if logsCount%maxLogsPerSend != 0 {
+ splitCount++
+ }
+ result := make([]*BatchLogs, 0, splitCount)
+
+ // split the connections by maxLogsPerSend
+ currentCount := 0
+ var currentBatch *BatchLogs
+ for connection, logs := range l.logs {
+ if currentCount%maxLogsPerSend == 0 {
+ currentBatch = newBatchLogs()
+ result = append(result, currentBatch)
+ currentCount = 0
+ }
+ currentBatch.logs[connection] = logs
+ currentCount++
+ }
+
+ return result
+}
+
+type ConnectionLogs struct {
+ kernels []*v3.AccessLogKernelLog
+ protocols []*ConnectionProtocolLog
+}
+
+type ConnectionProtocolLog struct {
+ kernels []*v3.AccessLogKernelLog
+ protocol *v3.AccessLogProtocolLogs
+}
+
+func newConnectionLogs() *ConnectionLogs {
+ return &ConnectionLogs{
+ kernels: make([]*v3.AccessLogKernelLog, 0),
+ protocols: make([]*ConnectionProtocolLog, 0),
+ }
+}
diff --git a/pkg/accesslog/sender/sender.go b/pkg/accesslog/sender/sender.go
new file mode 100644
index 0000000..c003e59
--- /dev/null
+++ b/pkg/accesslog/sender/sender.go
@@ -0,0 +1,236 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+ "container/list"
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/accesslog/common"
+ "github.com/apache/skywalking-rover/pkg/core"
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process"
+ "github.com/apache/skywalking-rover/pkg/tools/host"
+
+ "github.com/sirupsen/logrus"
+
+ v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var log = logger.GetLogger("accesslog", "sender")
+
+// GRPCSender Async to sending the access log to the backend
+type GRPCSender struct {
+ logs *list.List
+ notify chan bool
+ mutex sync.Mutex
+ ctx context.Context
+
+ mgr *module.Manager
+ connectionMgr *common.ConnectionManager
+ alsClient v3.EBPFAccessLogServiceClient
+ clusterName string
+}
+
+// NewGRPCSender creates a new GRPCSender
+func NewGRPCSender(mgr *module.Manager, connectionMgr
*common.ConnectionManager) *GRPCSender {
+ return &GRPCSender{
+ logs: list.New(),
+ notify: make(chan bool, 1),
+ mgr: mgr,
+ connectionMgr: connectionMgr,
+ clusterName:
mgr.FindModule(core.ModuleName).(core.Operator).ClusterName(),
+ alsClient:
v3.NewEBPFAccessLogServiceClient(mgr.FindModule(core.ModuleName).(core.Operator).
+ BackendOperator().GetConnection()),
+ }
+}
+
+func (g *GRPCSender) Start(ctx context.Context) {
+ g.ctx = ctx
+ go func() {
+ for {
+ select {
+ case <-g.notify:
+ if count, err := g.handleLogs(); err != nil {
+ log.Warnf("sending access log error,
lost %d logs, error: %v", count, err)
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+func (g *GRPCSender) NewBatch() *BatchLogs {
+ return &BatchLogs{
+ logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+ }
+}
+
+func (g *GRPCSender) AddBatch(batch *BatchLogs) {
+ // split logs
+ splitLogs := batch.splitBatchLogs()
+
+ // append the resend logs
+ g.mutex.Lock()
+ defer g.mutex.Unlock()
+ for _, l := range splitLogs {
+ g.logs.PushBack(l)
+ }
+
+ // notify the sender
+ select {
+ case g.notify <- true:
+ default:
+ }
+}
+
+func (g *GRPCSender) handleLogs() (int, error) {
+ for {
+ // pop logs
+ logs := g.popLogs()
+ if logs == nil {
+ return 0, nil
+ }
+ // send logs
+ now := time.Now()
+ if err := g.sendLogs(logs); err != nil {
+ return len(logs.logs), err
+ }
+ log.Infof("sending access log success, connection count: %d,
use time: %s",
+ logs.ConnectionCount(), time.Since(now).String())
+ }
+}
+
+func (g *GRPCSender) sendLogs(batch *BatchLogs) error {
+ timeout, cancelFunc := context.WithTimeout(g.ctx, time.Second*20)
+ defer cancelFunc()
+ streaming, err := g.alsClient.Collect(timeout)
+ if err != nil {
+ return err
+ }
+
+ firstLog := true
+ firstConnection := true
+ var sendError error
+ for connection, logs := range batch.logs {
+ if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
+ continue
+ }
+ if log.Enable(logrus.DebugLevel) {
+ log.Debugf("ready to sending access log with
connection, connection ID: %d, random ID: %d, "+
+ "local: %s, remote: %s, role: %s, contains
ztunnel address: %t, kernel logs count: %d, protocol log count: %d",
+ connection.ConnectionID, connection.RandomID,
connection.RPCConnection.Local, connection.RPCConnection.Remote,
+ connection.RPCConnection.Role,
connection.RPCConnection.Attachment != nil, len(logs.kernels),
len(logs.protocols))
+ }
+
+ if len(logs.kernels) > 0 {
+ sendError = g.sendLogToTheStream(streaming,
+ g.buildAccessLogMessage(firstLog,
firstConnection, connection, logs.kernels, nil))
+ firstLog, firstConnection = false, false
+ }
+ for _, protocolLog := range logs.protocols {
+ sendError = g.sendLogToTheStream(streaming,
+ g.buildAccessLogMessage(firstLog,
firstConnection, connection, protocolLog.kernels, protocolLog.protocol))
+ firstLog, firstConnection = false, false
+ }
+ if sendError != nil {
+ g.closeStream(streaming)
+ return fmt.Errorf("sending access log error: %v",
sendError)
+ }
+
+ firstConnection = true
+ }
+
+ g.closeStream(streaming)
+ return nil
+}
+
+func (g *GRPCSender) closeStream(s v3.EBPFAccessLogService_CollectClient) {
+ if _, err := s.CloseAndRecv(); err != nil {
+ log.Warnf("closing the access log streaming error: %v", err)
+ }
+}
+
+func (g *GRPCSender) sendLogToTheStream(streaming
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) error {
+ if err := streaming.Send(logMsg); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (g *GRPCSender) buildAccessLogMessage(firstLog, firstConnection bool,
conn *common.ConnectionInfo,
+ kernelLogs []*v3.AccessLogKernelLog, protocolLog
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
+ var rpcCon *v3.AccessLogConnection
+ if firstConnection {
+ rpcCon = conn.RPCConnection
+ }
+ return &v3.EBPFAccessLogMessage{
+ Node: g.BuildNodeInfo(firstLog),
+ Connection: rpcCon,
+ KernelLogs: kernelLogs,
+ ProtocolLog: protocolLog,
+ }
+}
+
+func (g *GRPCSender) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
+ if !needs {
+ return nil
+ }
+ netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
+ for i, n := range host.AllNetworkInterfaces() {
+ netInterfaces = append(netInterfaces,
&v3.EBPFAccessLogNodeNetInterface{
+ Index: int32(i),
+ Mtu: int32(n.MTU),
+ Name: n.Name,
+ })
+ }
+ return &v3.EBPFAccessLogNodeInfo{
+ Name:
g.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
+ NetInterfaces: netInterfaces,
+ BootTime: g.convertTimeToInstant(host.BootTime),
+ ClusterName: g.clusterName,
+ Policy: &v3.EBPFAccessLogPolicy{
+ ExcludeNamespaces:
g.connectionMgr.GetExcludeNamespaces(),
+ },
+ }
+}
+
+func (g *GRPCSender) convertTimeToInstant(t time.Time) *v32.Instant {
+ return &v32.Instant{
+ Seconds: t.Unix(),
+ Nanos: int32(t.Nanosecond()),
+ }
+}
+
+func (g *GRPCSender) popLogs() *BatchLogs {
+ g.mutex.Lock()
+ defer g.mutex.Unlock()
+ if g.logs.Len() == 0 {
+ return nil
+ }
+ e := g.logs.Front()
+ logs := e.Value.(*BatchLogs)
+ g.logs.Remove(e)
+ return logs
+}
diff --git a/pkg/logger/settings.go b/pkg/logger/settings.go
index a56c6ab..7705bb9 100644
--- a/pkg/logger/settings.go
+++ b/pkg/logger/settings.go
@@ -17,7 +17,9 @@
package logger
-import "github.com/sirupsen/logrus"
+import (
+ "github.com/sirupsen/logrus"
+)
const (
DefaultLoggerLevel = logrus.InfoLevel
diff --git a/pkg/profiling/task/network/analyze/base/connection.go
b/pkg/profiling/task/network/analyze/base/connection.go
index 2393f35..169381e 100644
--- a/pkg/profiling/task/network/analyze/base/connection.go
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -44,7 +44,8 @@ type ConnectionContext struct {
Metrics *ConnectionMetricsContext
// Flush the data content to the oap count
- FlushDataCount int
+ FlushDataCount int
+ DeleteFlushCount int
}
func (c *AnalyzerContext) NewConnectionContext(conID, randomID uint64, pid, fd
uint32, processes []api.ProcessInterface,
diff --git a/pkg/profiling/task/network/analyze/base/context.go
b/pkg/profiling/task/network/analyze/base/context.go
index de60778..56e42da 100644
--- a/pkg/profiling/task/network/analyze/base/context.go
+++ b/pkg/profiling/task/network/analyze/base/context.go
@@ -319,6 +319,7 @@ func (c *AnalyzerContext)
lookupTheActiveConnectionInBPf(connection *ConnectionC
func (c *AnalyzerContext) deleteConnectionOnly(ccs []string) {
for _, cc := range ccs {
+ log.Debugf("delete the closed connection in activate
connections context: %s", cc)
c.activeConnections.Remove(cc)
}
}
@@ -401,8 +402,17 @@ func (c *AnalyzerContext) flushClosedConnection()
[]*ConnectionContext {
c.closedConnectionLocker.Lock()
defer c.closedConnectionLocker.Unlock()
+ // encase the connection still have buffer not flush, then keep it
+ keepClosedConnections := make([]*ConnectionContext, 0)
connections := c.closedConnections
- c.closedConnections = make([]*ConnectionContext, 0)
+ for _, connection := range connections {
+ if connection.DeleteFlushCount > 1 {
+ continue
+ }
+ keepClosedConnections = append(keepClosedConnections,
connection)
+ connection.DeleteFlushCount++
+ }
+ c.closedConnections = keepClosedConnections
return connections
}
@@ -411,4 +421,6 @@ func (c *AnalyzerContext) appendClosedConnection(con
*ConnectionContext) {
defer c.closedConnectionLocker.RUnlock()
c.closedConnections = append(c.closedConnections, con)
+ log.Debugf("append the closed connection to the closed connection list,
conid: %d, randomid: %d",
+ con.ConnectionID, con.RandomID)
}
diff --git a/pkg/profiling/task/network/analyze/events/data.go
b/pkg/profiling/task/network/analyze/events/data.go
index 1f71fa8..bc451fc 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -35,6 +35,7 @@ type SocketDataUploadEvent struct {
ConnectionID uint64
RandomID uint64
DataID0 uint64
+ PrevDataID0 uint64
TotalSize0 uint64
Buffer [2048]byte
}
@@ -79,6 +80,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 {
return s.DataID0
}
+func (s *SocketDataUploadEvent) PrevDataID() uint64 {
+ return s.PrevDataID0
+}
+
func (s *SocketDataUploadEvent) DataSequence() int {
return int(s.Sequence0)
}
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go
b/pkg/profiling/task/network/analyze/layer4/listener.go
index e9e4caa..7358734 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -108,8 +108,9 @@ func (l *Listener) PreFlushConnectionMetrics(ccs
[]*base.ConnectionWithBPF, bpfL
keyWithContext[l.generateConID(connection.ConnectionID,
connection.RandomID)] = connection
if log.Enable(logrus.DebugLevel) {
- log.Debugf("found connection: %d, %s relation:
%s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d
bytes/%d, read: %d bytes/%d",
- connection.ConnectionID,
connection.Role.String(),
+ log.Debugf("found connection: %d_%d, %s relation:
%s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, "+
+ "is_closed: %t, write: %d bytes/%d, read: %d
bytes/%d",
+ connection.ConnectionID, connection.RandomID,
connection.Role.String(),
connection.LocalIP, connection.LocalPort,
connection.LocalPid, connection.RemoteIP, connection.RemotePort,
enums.ConnectionProtocolString(connection.Protocol), connection.IsSSL,
connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
layer4.WriteCounter.Cur.Count,
layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 707aaa8..511a088 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -29,7 +29,7 @@ import (
)
func (l *Listener) initSocketDataQueue(parallels, queueSize int, config
*profiling.TaskConfig) {
- l.socketDataQueue = btf.NewEventQueue(parallels, queueSize, func(num
int) btf.PartitionContext {
+ l.socketDataQueue = btf.NewEventQueue("socket data resolver",
parallels, queueSize, func(num int) btf.PartitionContext {
return NewSocketDataPartitionContext(l, config)
})
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 1acc342..188707e 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -172,6 +172,7 @@ func (a *ProtocolAnalyzer)
processConnectionEvents(connection *connectionInfo) {
// reset the status for prepare reading
metrics := connection.metrics
connectionID := connection.connectionID
+ randomID := connection.randomID
connection.buffer.ResetForLoopReading()
// loop to read the protocol data
for {
@@ -181,7 +182,7 @@ func (a *ProtocolAnalyzer)
processConnectionEvents(connection *connectionInfo) {
return
}
- result := a.protocol.ParseProtocol(connectionID, metrics,
connection.buffer)
+ result := a.protocol.ParseProtocol(connectionID, randomID,
metrics, connection.buffer)
finishReading := false
switch result {
case enums.ParseResultSuccess:
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 3ca7dcb..5dfcca3 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -31,7 +31,7 @@ type Protocol interface {
GenerateMetrics() Metrics
Init(config *profiling.TaskConfig)
- ParseProtocol(connectionID uint64, metrics Metrics, reader
*buffer.Buffer) enums.ParseResult
+ ParseProtocol(connectionID, randomID uint64, metrics Metrics, reader
*buffer.Buffer) enums.ParseResult
PackageMaxExpireDuration() time.Duration
UpdateExtensionConfig(config *profiling.ExtensionConfig)
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 6cec0ef..84ffc1d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -90,7 +90,7 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
h.sampleConfig = NewSamplingConfig(config)
}
-func (h *Analyzer) ParseProtocol(connectionID uint64, metrics
protocol.Metrics, buf *buffer.Buffer) enums.ParseResult {
+func (h *Analyzer) ParseProtocol(connectionID, randomID uint64, metrics
protocol.Metrics, buf *buffer.Buffer) enums.ParseResult {
connectionMetrics := metrics.(*ConnectionMetrics)
messageType, err := h.reader.IdentityMessageType(buf)
if err != nil {
@@ -102,11 +102,14 @@ func (h *Analyzer) ParseProtocol(connectionID uint64,
metrics protocol.Metrics,
case reader.MessageTypeRequest:
result, err = h.handleRequest(connectionMetrics, buf)
case reader.MessageTypeResponse:
- result, err = h.handleResponse(connectionID, connectionMetrics,
buf)
+ result, err = h.handleResponse(connectionID, randomID,
connectionMetrics, buf)
case reader.MessageTypeUnknown:
return enums.ParseResultSkipPackage
}
+ log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+
+ "connection ID: %d, random ID: %d, metrics : %p, handle result:
%d",
+ messageType, buf, buf.Position().DataID(), connectionID,
randomID, metrics, result)
if err != nil {
log.Warnf("reading %v error: %v", messageType, err)
return enums.ParseResultSkipPackage
@@ -130,10 +133,13 @@ func (h *Analyzer) handleRequest(metrics
*ConnectionMetrics, buf *buffer.Buffer)
return enums.ParseResultSuccess, nil
}
-func (h *Analyzer) handleResponse(connectionID uint64, metrics
*ConnectionMetrics, buf *buffer.Buffer) (enums.ParseResult, error) {
+func (h *Analyzer) handleResponse(connectionID, randomID uint64, metrics
*ConnectionMetrics,
+ buf *buffer.Buffer) (enums.ParseResult, error) {
// find the first request
firstElement := metrics.halfData.Front()
if firstElement == nil {
+ log.Debugf("cannot found request for response, skip response,
connection ID: %d, random ID: %d, "+
+ "current data id: %d", connectionID, randomID,
buf.Position().DataID())
return enums.ParseResultSkipPackage, nil
}
request := metrics.halfData.Remove(firstElement).(*reader.Request)
@@ -162,7 +168,8 @@ func (h *Analyzer) handleResponse(connectionID uint64,
metrics *ConnectionMetric
if log.Enable(logrus.DebugLevel) {
metricsJSON, _ := json.Marshal(data)
- log.Debugf("generated metrics, connection id: %d, side: %s,
metrisc: %s", connectionID, side.String(), string(metricsJSON))
+ log.Debugf("generated metrics, connection id: %d, random id:
%d, side: %s, metrisc: %s, metrics pointer: %p",
+ connectionID, randomID, side.String(),
string(metricsJSON), metrics)
}
return enums.ParseResultSuccess, nil
}
@@ -217,8 +224,8 @@ func (m *ConnectionMetrics)
MergeMetricsFromConnection(connection *base.Connecti
if log.Enable(logrus.DebugLevel) {
clientMetrics, _ := json.Marshal(m.clientMetrics)
serverMetrics, _ := json.Marshal(m.serverMetrics)
- log.Debugf("combine metrics: conid: %d_%d, client side metrics:
%s, server side metrics: %s",
- connection.ConnectionID, connection.RandomID,
clientMetrics, serverMetrics)
+ log.Debugf("combine metrics: conid: %d_%d, porinters: %p-%p,
client side metrics: %s, server side metrics: %s",
+ connection.ConnectionID, connection.RandomID, m, other,
clientMetrics, serverMetrics)
}
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 7224727..f966743 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -57,6 +57,13 @@ func (r *Request) MinDataID() int {
return int(r.headerBuffer.FirstSocketBuffer().DataID())
}
+func (r *Request) MaxDataID() int {
+ if r.bodyBuffer != nil {
+ return int(r.bodyBuffer.LastSocketBuffer().DataID())
+ }
+ return int(r.headerBuffer.LastSocketBuffer().DataID())
+}
+
func (r *Request) Original() *http.Request {
return r.original
}
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 307f29f..0e3bdb0 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -20,7 +20,6 @@ package btf
import (
"bytes"
"embed"
- "errors"
"fmt"
"path/filepath"
"sync"
@@ -62,7 +61,6 @@ func GetEBPFCollectionOptionsIfNeed(bpfSpec
*ebpf.CollectionSpec) *ebpf.Collecti
spec = readSpec
})
- enhanceDataQueueOpts(bpfSpec)
return &ebpf.CollectionOptions{Programs:
ebpf.ProgramOptions{KernelTypes: spec}}
}
@@ -98,31 +96,3 @@ func getKernelBTFAddress() (spec *btf.Spec, err error) {
func asset(file string) ([]byte, error) {
return assets.ReadFile(filepath.ToSlash(file))
}
-
-func validateGlobalConstVoidPtrVar(t btf.Type) error {
- btfVar, ok := t.(*btf.Var)
- if !ok {
- return errors.New("not of type btf.Var")
- }
-
- if btfVar.Linkage != btf.GlobalVar {
- return fmt.Errorf("%q is not a global variable", btfVar.Name)
- }
-
- btfPtr, ok := btfVar.Type.(*btf.Pointer)
- if !ok {
- return fmt.Errorf("%q is not a pointer", btfVar.Name)
- }
-
- btfConst, ok := btfPtr.Target.(*btf.Const)
- if !ok {
- return fmt.Errorf("%q is not const", btfVar.Name)
- }
-
- _, ok = btfConst.Type.(*btf.Void)
- if !ok {
- return fmt.Errorf("%q is not a const void pointer", btfVar.Name)
- }
-
- return nil
-}
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 867d84e..bdbd7a6 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -83,6 +83,7 @@ type Linker struct {
closeOnce sync.Once
linkedUProbes map[string]bool
+ linkMutex sync.Mutex
}
func NewLinker() *Linker {
@@ -136,6 +137,8 @@ func (m *Linker) AddSysCallWithKProbe(call string, linkK
LinkFunc, p *ebpf.Progr
if p == nil {
return
}
+ m.linkMutex.Lock()
+ defer m.linkMutex.Unlock()
kprobe, err := linkK(syscallPrefix+call, p, nil)
if err != nil {
@@ -147,10 +150,13 @@ func (m *Linker) AddSysCallWithKProbe(call string, linkK
LinkFunc, p *ebpf.Progr
}
func (m *Linker) AddTracePoint(sys, name string, p *ebpf.Program) {
+ m.linkMutex.Lock()
+ defer m.linkMutex.Unlock()
l, e := link.Tracepoint(sys, name, p, nil)
if e != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open %s
error: %v", name, e))
} else {
+ log.Debugf("attach to the tracepoint: sys: %s, name: %s", sys,
name)
m.closers = append(m.closers, l)
}
}
@@ -264,6 +270,8 @@ func (u *UProbeExeFile) AddLinkWithType(symbol string,
enter bool, p *ebpf.Progr
}
func (u *UProbeExeFile) addLinkWithType0(symbol string, enter bool, p
*ebpf.Program, customizeAddress uint64) (link.Link, error) {
+ u.linker.linkMutex.Lock()
+ defer u.linker.linkMutex.Unlock()
// check already linked
uprobeIdentity := fmt.Sprintf("%s_%s_%t_%d", u.addr, symbol, enter,
customizeAddress)
if u.linker.linkedUProbes[uprobeIdentity] {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 290f717..dc78465 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -21,61 +21,17 @@ import (
"context"
"fmt"
"hash/fnv"
- "os"
- "strings"
"sync"
+ "time"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
)
-const dataQueuePrefix = "rover_data_queue_"
-
-var (
- ringbufChecker sync.Once
- ringbufAvailable bool
-)
-
-func isRingbufAvailable() bool {
- ringbufChecker.Do(func() {
- buf, err := ebpf.NewMap(&ebpf.MapSpec{
- Type: ebpf.RingBuf,
- MaxEntries: uint32(os.Getpagesize()),
- })
-
- buf.Close()
-
- ringbufAvailable = err == nil
-
- if ringbufAvailable {
- log.Infof("detect the ring buffer is available in
current system for enhancement of data queue")
- }
- })
-
- return ringbufAvailable
-}
-
-func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) {
- it := bpfSpec.Types.Iterate()
- for it.Next() {
- if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) {
- continue
- }
- if err := validateGlobalConstVoidPtrVar(it.Type); err != nil {
- panic(fmt.Errorf("invalid global const void ptr var %s:
%v", it.Type.TypeName(), err))
- }
-
- // if the ringbuf not available, use perf event array
- if !isRingbufAvailable() {
- mapName := strings.TrimPrefix(it.Type.TypeName(),
dataQueuePrefix)
- mapSpec := bpfSpec.Maps[mapName]
- mapSpec.Type = ebpf.PerfEventArray
- mapSpec.KeySize = 4
- mapSpec.ValueSize = 4
- }
- }
-}
+// queueChannelReducingCountCheckInterval is the interval to check the queue
channel reducing count
+// if the reducing count is almost full, then added a warning log
+const queueChannelReducingCountCheckInterval = time.Second * 5
type queueReader interface {
Read() ([]byte, error)
@@ -125,6 +81,7 @@ func (p *perfQueueReader) Close() error {
type ringBufReader struct {
reader *ringbuf.Reader
+ name string
}
func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
@@ -132,7 +89,7 @@ func newRingBufReader(emap *ebpf.Map) (*ringBufReader,
error) {
if err != nil {
return nil, err
}
- return &ringBufReader{reader: reader}, nil
+ return &ringBufReader{reader: reader, name: emap.String()}, nil
}
func (r *ringBufReader) Read() ([]byte, error) {
@@ -153,6 +110,7 @@ type PartitionContext interface {
}
type EventQueue struct {
+ name string
count int
receivers []*mapReceiver
partitions []*partition
@@ -168,12 +126,12 @@ type mapReceiver struct {
parallels int
}
-func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator
func(partitionNum int) PartitionContext) *EventQueue {
+func NewEventQueue(name string, partitionCount, sizePerPartition int,
contextGenerator func(partitionNum int) PartitionContext) *EventQueue {
partitions := make([]*partition, 0)
for i := 0; i < partitionCount; i++ {
partitions = append(partitions, newPartition(i,
sizePerPartition, contextGenerator(i)))
}
- return &EventQueue{count: partitionCount, partitions: partitions}
+ return &EventQueue{name: name, count: partitionCount, partitions:
partitions}
}
func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
@@ -236,6 +194,27 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
}
}(ctx, i)
}
+
+ // channel reducing count check
+ go func() {
+ ticker := time.NewTicker(queueChannelReducingCountCheckInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ for _, p := range e.partitions {
+ reducingCount := len(p.channel)
+ if reducingCount > cap(p.channel)*9/10 {
+ log.Warnf("queue %s partition
%d reducing count is almost full, "+
+ "please trying to
increase the parallels count or queue size, status: %d/%d",
+ e.name, p.index,
reducingCount, cap(p.channel))
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
}
func (e *EventQueue) routerTransformer(data interface{}, routeGenerator
func(data interface{}) string) {
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index c16a9fd..7a74fc8 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -20,6 +20,7 @@ package buffer
import (
"container/list"
"errors"
+ "fmt"
"io"
"sync"
"time"
@@ -55,6 +56,8 @@ type SocketDataBuffer interface {
BufferLen() int
// DataID data id of the buffer
DataID() uint64
+ // PrevDataID the previous data id of the buffer
+ PrevDataID() uint64
// DataSequence the data sequence under same data id
DataSequence() int
// IsStart this buffer is start of the same data id
@@ -83,11 +86,19 @@ type DataIDRange struct {
IsToBufferReadFinished bool
}
+func (i *DataIDRange) String() string {
+ return fmt.Sprintf("from: %d, to: %d, isToBufferReadFinished: %t",
i.From, i.To, i.IsToBufferReadFinished)
+}
+
type Buffer struct {
dataEvents *list.List
detailEvents *list.List
validated bool // the events list is validated or not
+ // shouldResetPosition means the buffer have new buffer appended into
the buffer
+ // so the position should reset to the head, and re-read the buffer
+ shouldResetPosition bool
+
eventLocker sync.RWMutex
head *Position
@@ -184,6 +195,10 @@ func (p *Position) DataID() uint64 {
return p.element.Value.(SocketDataBuffer).DataID()
}
+func (p *Position) PrevDataID() uint64 {
+ return p.element.Value.(SocketDataBuffer).PrevDataID()
+}
+
func (p *Position) Seq() int {
return p.element.Value.(SocketDataBuffer).DataSequence()
}
@@ -609,6 +624,12 @@ func (r *Buffer) PrepareForReading() bool {
if r.dataEvents.Len() == 0 {
return false
}
+ // if the buffer should reset, then reset the position and cannot be
read from current position
+ if r.shouldResetPosition {
+ r.ResetForLoopReading()
+ r.shouldResetPosition = false
+ return false
+ }
if r.head == nil || r.head.element == nil {
// read in the first element
r.eventLocker.RLock()
@@ -741,8 +762,16 @@ func (r *Buffer) AppendDataEvent(event SocketDataBuffer) {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
+ defer func() {
+ // if the current position is not nil and the current reading
data id is bigger than the event data id
+ if r.current != nil && r.current.DataID() > event.DataID() {
+ r.shouldResetPosition = true
+ }
+ }()
+
if r.dataEvents.Len() == 0 {
r.dataEvents.PushFront(event)
+ r.shouldResetPosition = true
return
}
if r.dataEvents.Back().Value.(SocketDataBuffer).DataID() <
event.DataID() {
@@ -802,6 +831,8 @@ func (r *Buffer) DeleteExpireEvents(expireDuration
time.Duration) int {
}
func (r *Buffer) DataLength() int {
+ r.eventLocker.RLock()
+ defer r.eventLocker.RUnlock()
if r.dataEvents == nil {
return 0
}
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index b1357cd..72dc1b9 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -21,10 +21,10 @@ import (
"net"
"syscall"
- "github.com/apache/skywalking-rover/pkg/logger"
-
"github.com/florianl/go-conntrack"
+ "github.com/apache/skywalking-rover/pkg/logger"
+
"golang.org/x/sys/unix"
)
@@ -36,9 +36,6 @@ var numberStrategies = []struct {
}{{
name: "tcp",
proto: syscall.IPPROTO_TCP,
-}, {
- name: "udp",
- proto: syscall.IPPROTO_UDP,
}}
type ConnTrack struct {
@@ -77,19 +74,6 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair)
bool {
return true
}
}
-
- // using dump to query protocol
- dump, e := c.tracker.Dump(conntrack.Conntrack, family)
- if e != nil {
- log.Debug("cannot dump the conntrack session, error: ", e)
- return false
- }
- if res := c.filterValidateReply(dump, tuple); res != nil {
- addr.DestIP = res.Src.String()
- log.Debugf("found the connection from the dump all conntrack,
src: %s:%d, dst: %s:%d, proto number: %d",
- tuple.Src, *tuple.Proto.SrcPort, tuple.Dst,
*tuple.Proto.DstPort, *tuple.Proto.Number)
- return true
- }
return false
}
diff --git a/scripts/build/bash/btfgen.sh b/scripts/build/bash/btfgen.sh
index 8441e73..304e443 100644
--- a/scripts/build/bash/btfgen.sh
+++ b/scripts/build/bash/btfgen.sh
@@ -26,6 +26,7 @@ BPF_SO_PATTERN="^bpf\_[a-z0-9]+(_[a-z0-9]+)?\.o"
echo "btfhub-archive is a big archive project, maybe take some times..."
git clone --depth 1 https://github.com/aquasecurity/btfhub $TMPDIR/btfhub
git clone --depth 1 https://github.com/aquasecurity/btfhub-archive/
$TMPDIR/btfhub-archive/
+mkdir -p $TMPDIR/btfhub/archive/
mv $TMPDIR/btfhub-archive/* $TMPDIR/btfhub/archive/
each_all_bpf_so_file() {