This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch access_log_improvement in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit d62cbdd953d0ee9a1d96dde97aa11bde137bfcab Author: mrproliu <[email protected]> AuthorDate: Tue Dec 17 16:07:42 2024 +0800 Improve the performance of access log module --- CHANGES.md | 5 +- bpf/accesslog/common/connection.h | 6 +- bpf/include/queue.h | 26 +-- bpf/include/socket_data.h | 29 ++- pkg/accesslog/collector/connection.go | 2 +- pkg/accesslog/collector/protocols/http1.go | 29 ++- pkg/accesslog/collector/protocols/queue.go | 22 +- pkg/accesslog/collector/tls.go | 13 ++ pkg/accesslog/common/connection.go | 98 +++++++-- pkg/accesslog/events/data.go | 6 + pkg/accesslog/runner.go | 161 ++------------ pkg/accesslog/sender/logs.go | 107 ++++++++++ pkg/accesslog/sender/sender.go | 234 +++++++++++++++++++++ pkg/logger/settings.go | 4 +- pkg/profiling/task/network/analyze/events/data.go | 5 + .../task/network/analyze/layer7/events.go | 2 +- .../layer7/protocols/http1/reader/request.go | 7 + pkg/tools/btf/ebpf.go | 1 - pkg/tools/btf/linker.go | 8 + pkg/tools/btf/queue.go | 79 +++---- pkg/tools/buffer/buffer.go | 31 +++ pkg/tools/ip/conntrack.go | 19 +- 22 files changed, 618 insertions(+), 276 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a27176d..8cb3d99 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,7 +16,10 @@ Release Notes. * Support parallel parsing protocol data in the access log module. * Upgrade Go library to `1.22`, eBPF library to `0.16.0`. * Reduce missing details issue in the access log module. -* Introduce ringbuf queue to improve performance in the access log module. +* Improve HTTP/1.x protocol parsing strategy to encase missing data. +* Add gRPC sender to sending the access log to the backend. +* Add warning log when the event queue almost full in the access log module. +* Reduce unessential `conntrack` query when detect new connection. #### Bug Fixes * Fix the base image cannot run in the arm64. diff --git a/bpf/accesslog/common/connection.h b/bpf/accesslog/common/connection.h index cb799e8..880826d 100644 --- a/bpf/accesslog/common/connection.h +++ b/bpf/accesslog/common/connection.h @@ -22,6 +22,7 @@ #include "data_args.h" #include "socket_opts.h" #include "queue.h" +#include "socket_data.h" // syscall:connect struct connect_args_t { @@ -107,7 +108,7 @@ struct socket_connect_event_t { __u64 conntrack_upstream_iph; __u32 conntrack_upstream_port; }; -DATA_QUEUE(socket_connection_event_queue, 1024 * 1024); +DATA_QUEUE(socket_connection_event_queue); // active connection cached into the hashmap // if connection closed, then deleted @@ -159,7 +160,7 @@ struct socket_close_event_t { // close success __u32 success; }; -DATA_QUEUE(socket_close_event_queue, 1024 * 1024); +DATA_QUEUE(socket_close_event_queue); static __inline bool family_should_trace(const __u32 family) { return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true; @@ -303,6 +304,7 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru } static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) { + bpf_map_delete_elem(&socket_data_last_id_map, &conid); struct socket_close_event_t *close_event; close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event)); if (close_event == NULL) { diff --git a/bpf/include/queue.h b/bpf/include/queue.h index c6b532c..559aeb4 100644 --- a/bpf/include/queue.h +++ b/bpf/include/queue.h @@ -19,12 +19,10 @@ #include "api.h" -#define DATA_QUEUE(name, size) \ - struct { \ - __uint(type, BPF_MAP_TYPE_RINGBUF); \ - __uint(max_entries, size); \ - } name SEC(".maps"); \ - const void *rover_data_queue_##name __attribute__((unused)); +#define DATA_QUEUE(name) \ + struct { \ + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);\ + } name SEC(".maps"); struct { __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); @@ -36,26 +34,12 @@ struct { static __always_inline void *rover_reserve_buf(void *map, __u64 size) { static const int zero = 0; - if (bpf_core_enum_value_exists(enum bpf_func_id, - BPF_FUNC_ringbuf_reserve)) - return bpf_ringbuf_reserve(map, size, 0); - return bpf_map_lookup_elem(&rover_data_heap, &zero); } -static __always_inline void rover_discard_buf(void *buf) -{ - if (bpf_core_enum_value_exists(enum bpf_func_id, - BPF_FUNC_ringbuf_discard)) - bpf_ringbuf_discard(buf, 0); +static __always_inline void rover_discard_buf(void *buf) { } static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, __u64 size) { - if (bpf_core_enum_value_exists(enum bpf_func_id, - BPF_FUNC_ringbuf_submit)) { - bpf_ringbuf_submit(buf, 0); - return 0; - } - return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size); } \ No newline at end of file diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h index 0d62ccb..248b34c 100644 --- a/bpf/include/socket_data.h +++ b/bpf/include/socket_data.h @@ -35,6 +35,7 @@ struct socket_data_upload_event { __u64 conid; __u64 randomid; __u64 data_id; + __u64 prev_data_id; __u64 total_size; char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH]; }; @@ -44,7 +45,7 @@ struct { __type(value, struct socket_data_upload_event); __uint(max_entries, 1); } socket_data_upload_event_per_cpu_map SEC(".maps"); -DATA_QUEUE(socket_data_upload_queue, 1024 * 1024); +DATA_QUEUE(socket_data_upload_queue); struct socket_data_sequence_t { __u64 data_id; @@ -81,6 +82,7 @@ struct upload_data_args { __u64 random_id; __u64 socket_data_id; + __u64 prev_socket_data_id; struct iovec *socket_data_iovec; size_t socket_data_iovlen; ssize_t bytes_count; @@ -129,11 +131,13 @@ static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 ind socket_data_event->randomid = args->random_id; socket_data_event->total_size = args->bytes_count; socket_data_event->data_id = args->socket_data_id; + socket_data_event->prev_data_id = args->prev_socket_data_id; socket_data_event->sequence = index; socket_data_event->data_len = size; socket_data_event->finished = is_finished; socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk; + asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :); bpf_probe_read(&socket_data_event->buffer, size, buf); rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, sizeof(*socket_data_event)); } @@ -208,15 +212,38 @@ static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, UPLOAD_PER_SOCKET_DATA_IOV(); } +struct socket_data_last_id_t { + __u64 random_id; + __u64 socket_data_id; +}; +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, 10000); + __type(key, __u64); + __type(value, struct socket_data_last_id_t); +} socket_data_last_id_map SEC(".maps"); + static __inline void upload_socket_data(void *ctx, struct upload_data_args *args) { // must have protocol and ssl must same(plain) // if the connection data is needs to skip upload, then skip if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || args->connection_ssl != args->socket_data_ssl || args->connection_skip_data_upload == 1) { return; } + struct socket_data_last_id_t *latest = bpf_map_lookup_elem(&socket_data_last_id_map, &args->con_id); + args->prev_socket_data_id = 0; + if (latest != NULL && latest->random_id == args->random_id) { + args->prev_socket_data_id = latest->socket_data_id; + } if (args->socket_data_buf != NULL) { upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, args, args->socket_ssl_buffer_force_unfinished); } else if (args->socket_data_iovec != NULL) { upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, args); } + + if (latest == NULL || latest->socket_data_id != args->socket_data_id) { + struct socket_data_last_id_t data = {}; + data.random_id = args->random_id; + data.socket_data_id = args->socket_data_id; + bpf_map_update_elem(&socket_data_last_id_map, &args->con_id, &data, BPF_ANY); + } } \ No newline at end of file diff --git a/pkg/accesslog/collector/connection.go b/pkg/accesslog/collector/connection.go index a5565a1..d861447 100644 --- a/pkg/accesslog/collector/connection.go +++ b/pkg/accesslog/collector/connection.go @@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext if err != nil { connectionLogger.Warnf("cannot create the connection tracker, %v", err) } - c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels, + c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { return newConnectionPartitionContext(ctx, track) }) diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 9c17160..227422f 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -90,9 +90,6 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe } messageType, err := p.reader.IdentityMessageType(buf) - log.Debugf("ready to reading message type, messageType: %v, buf: %p, data id: %d, "+ - "connection ID: %d, random ID: %d, error: %v", messageType, buf, buf.Position().DataID(), - metrics.ConnectionID, metrics.RandomID, err) if err != nil { http1Log.Debugf("failed to identity message type, %v", err) if buf.SkipCurrentElement() { @@ -115,6 +112,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err) } + http1Log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+ + "connection ID: %d, random ID: %d, metrics : %p, handle result: %d", + messageType, buf, buf.Position().DataID(), metrics.ConnectionID, metrics.RandomID, metrics, result) finishReading := false switch result { case enums.ParseResultSuccess: @@ -149,13 +149,13 @@ func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer) } func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) { - firstRequest := metrics.halfRequests.Front() - if firstRequest == nil { - log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d", - metrics.ConnectionID, metrics.RandomID) + request := metrics.findMatchesRequest(b.Position().DataID(), b.Position().PrevDataID()) + if request == nil { + log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d, "+ + "required prev data id: %d, current data id: %d", + metrics.ConnectionID, metrics.RandomID, b.Position().PrevDataID(), b.Position().DataID()) return enums.ParseResultSkipPackage, nil } - request := metrics.halfRequests.Remove(firstRequest).(*reader.Request) // parsing response response, result, err := p.reader.ReadResponse(request, b, true) @@ -286,3 +286,16 @@ func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) { m.halfRequests.PushBack(req) } } + +func (m *HTTP1Metrics) findMatchesRequest(currentDataID, prevDataID uint64) *reader.Request { + for element := m.halfRequests.Front(); element != nil; element = element.Next() { + req := element.Value.(*reader.Request) + // if the tail data id of request is equals to the prev data id of response + // or tail request data id+1==first response data id, then return the request + if uint64(req.MaxDataID()) == prevDataID || uint64(req.MaxDataID()+1) == currentDataID { + m.halfRequests.Remove(element) + return req + } + } + return nil +} diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 7e07eb3..c9789b7 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -88,7 +88,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) { } func (q *AnalyzeQueue) Start(ctx context.Context) { - q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize, + q.eventQueue = btf.NewEventQueue("socket data analyzer", + q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize, func(num int) btf.PartitionContext { return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context)) }) @@ -128,12 +129,13 @@ type PartitionContext struct { func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection { connection := &PartitionConnection{ - connectionID: conID, - randomID: randomID, - dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer), - protocol: make(map[enums.ConnectionProtocol]uint64), - protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol), - protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics), + connectionID: conID, + randomID: randomID, + dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer), + protocol: make(map[enums.ConnectionProtocol]uint64), + protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol), + protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics), + lastCheckCloseTime: time.Now(), } connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID) return connection @@ -173,7 +175,6 @@ func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent, clo } connection := conn.(*PartitionConnection) connection.closeCallback = closeCallback - connection.closed = true log.Debugf("receive the connection close event and mark is closable, connection ID: %d, random ID: %d, partition number: %d", event.GetConnectionID(), event.GetRandomID(), p.partitionNum) } @@ -227,8 +228,9 @@ func (p *PartitionContext) Consume(data interface{}) { connection.AppendDetail(p.context, event) case *events.SocketDataUploadEvent: pid, _ := events.ParseConnectionID(event.ConnectionID) - log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d", - event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0) + log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, prev data id: %d, "+ + "data id: %d, sequence: %d, protocol: %d", + event.ConnectionID, event.RandomID, pid, event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0) connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0) connection.AppendData(event) } diff --git a/pkg/accesslog/collector/tls.go b/pkg/accesslog/collector/tls.go index 41c8811..0d14a58 100644 --- a/pkg/accesslog/collector/tls.go +++ b/pkg/accesslog/collector/tls.go @@ -18,6 +18,8 @@ package collector import ( + "sync" + "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" @@ -33,6 +35,7 @@ type TLSCollector struct { context *common.AccessLogContext monitoredProcesses map[int32]bool linker *btf.Linker + mutex sync.Mutex } func NewTLSCollector() *TLSCollector { @@ -57,6 +60,14 @@ func (c *TLSCollector) Stop() { } func (c *TLSCollector) OnNewProcessMonitoring(pid int32) { + go func() { + c.addProcess(pid) + }() +} + +func (c *TLSCollector) addProcess(pid int32) { + c.mutex.Lock() + defer c.mutex.Unlock() if _, ok := c.monitoredProcesses[pid]; ok { return } @@ -83,5 +94,7 @@ func (c *TLSCollector) OnNewProcessMonitoring(pid int32) { } func (c *TLSCollector) OnProcessRemoved(pid int32) { + c.mutex.Lock() + defer c.mutex.Unlock() delete(c.monitoredProcesses, pid) } diff --git a/pkg/accesslog/common/connection.go b/pkg/accesslog/common/connection.go index 7c0e992..e948272 100644 --- a/pkg/accesslog/common/connection.go +++ b/pkg/accesslog/common/connection.go @@ -57,6 +57,9 @@ const ( // in case the reading the data from BPF queue is disordered, so add a delay time to delete the connection information connectionDeleteDelayTime = time.Second * 20 + + // the connection check exist time + connectionCheckExistTime = time.Second * 30 ) type addressProcessType int @@ -144,13 +147,14 @@ type addressInfo struct { } type ConnectionInfo struct { - ConnectionID uint64 - RandomID uint64 - RPCConnection *v3.AccessLogConnection - MarkDeletable bool - PID uint32 - Socket *ip.SocketPair - DeleteAfter *time.Time + ConnectionID uint64 + RandomID uint64 + RPCConnection *v3.AccessLogConnection + MarkDeletable bool + PID uint32 + Socket *ip.SocketPair + LastCheckExistTime time.Time + DeleteAfter *time.Time } func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *bpf.Loader, filter MonitorFilter) *ConnectionManager { @@ -192,8 +196,10 @@ func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessL // if the connection is not existed, then delete it if err := c.activeConnectionMap.Delete(conID); err != nil { - log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v", - pid, fd, conID, activateConn.RandomID, err) + if !errors.Is(err, ebpf.ErrKeyNotExist) { + log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v", + pid, fd, conID, activateConn.RandomID, err) + } continue } log.Debugf("deleted the active connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, random ID: %d", @@ -318,17 +324,20 @@ func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, eve c.allUnfinishedConnections[fmt.Sprintf("%d_%d", event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished } case events.SocketDetail: + tlsMode := connection.RPCConnection.TlsMode + protocol := connection.RPCConnection.Protocol if e.GetSSL() == 1 && connection.RPCConnection.TlsMode == v3.AccessLogConnectionTLSMode_Plain { - connection.RPCConnection.TlsMode = v3.AccessLogConnectionTLSMode_TLS + tlsMode = v3.AccessLogConnectionTLSMode_TLS } if e.GetProtocol() != enums.ConnectionProtocolUnknown && connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP { switch e.GetProtocol() { case enums.ConnectionProtocolHTTP: - connection.RPCConnection.Protocol = v3.AccessLogProtocolType_HTTP_1 + protocol = v3.AccessLogProtocolType_HTTP_1 case enums.ConnectionProtocolHTTP2: - connection.RPCConnection.Protocol = v3.AccessLogProtocolType_HTTP_2 + protocol = v3.AccessLogProtocolType_HTTP_2 } } + c.rebuildRPCConnectionWithTLSModeAndProtocol(connection, tlsMode, protocol) } // notify all flush listeners the connection is ready to flush @@ -337,6 +346,22 @@ func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, eve } } +// According to https://github.com/golang/protobuf/issues/1609 +// if the message is modified during marshalling, it may cause the error when send the message to the backend +// so, we need to clone the message and change it before sending it to the channel +func (c *ConnectionManager) rebuildRPCConnectionWithTLSModeAndProtocol(connection *ConnectionInfo, + tls v3.AccessLogConnectionTLSMode, protocol v3.AccessLogProtocolType) { + original := connection.RPCConnection + connection.RPCConnection = &v3.AccessLogConnection{ + Local: original.Local, + Remote: original.Remote, + Role: original.Role, + TlsMode: tls, + Protocol: protocol, + Attachment: original.Attachment, + } +} + func (c *ConnectionManager) ProcessIsMonitor(pid uint32) bool { c.monitoringProcessLock.RLock() defer c.monitoringProcessLock.RUnlock() @@ -360,11 +385,12 @@ func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, so Protocol: v3.AccessLogProtocolType_TCP, } return &ConnectionInfo{ - ConnectionID: event.ConID, - RandomID: event.RandomID, - RPCConnection: connection, - PID: event.PID, - Socket: socket, + ConnectionID: event.ConID, + RandomID: event.RandomID, + RPCConnection: connection, + PID: event.PID, + Socket: socket, + LastCheckExistTime: time.Now(), } } @@ -503,6 +529,15 @@ func (c *ConnectionManager) AddNewProcess(pid int32, entities []api.ProcessInter defer c.monitoringProcessLock.Unlock() // adding monitoring process and IP addresses + var entity *api.ProcessEntity + if entities != nil && len(entities) > 0 { + entity = entities[0].Entity() + } + log.Infof("adding monitoring process, pid: %d, entity: %v", pid, entity) + if _, ok := c.monitoringProcesses[pid]; ok { + log.Infof("the process %d already monitoring, so no needs to add again", pid) + return + } c.monitoringProcesses[pid] = monitorProcesses c.updateMonitorStatusForProcess(pid, true) for _, entity := range monitorProcesses { @@ -545,6 +580,30 @@ func (c *ConnectionManager) shouldMonitorProcesses(entities []api.ProcessInterfa return c.monitorFilter.ShouldIncludeProcesses(entities) } +func (c *ConnectionManager) checkConnectionIsExist(con *ConnectionInfo) bool { + // skip the check if the check time is not reach + if time.Since(con.LastCheckExistTime) < connectionCheckExistTime { + return true + } + con.LastCheckExistTime = time.Now() + var activateConn ActiveConnection + c.activeConnectionMap.Lookup(con.ConnectionID, &activateConn) + if err := c.activeConnectionMap.Lookup(con.ConnectionID, &activateConn); err != nil { + if errors.Is(err, ebpf.ErrKeyNotExist) { + con.MarkDeletable = true + return false + } + log.Warnf("cannot found the active connection: %d-%d, err: %v", con.ConnectionID, con.RandomID, err) + return false + } else if activateConn.RandomID != 0 && activateConn.RandomID != con.RandomID { + log.Debugf("detect the connection: %d-%d is already closed(by difference random ID), so remove from the connection manager", + con.ConnectionID, con.RandomID) + con.MarkDeletable = true + return false + } + return true +} + func (c *ConnectionManager) RemoveProcess(pid int32, entities []api.ProcessInterface) { c.monitoringProcessLock.Lock() defer c.monitoringProcessLock.Unlock() @@ -623,7 +682,7 @@ func (c *ConnectionManager) updateMonitorStatusForProcess(pid int32, monitor boo func (c *ConnectionManager) OnBuildConnectionLogFinished() { // delete all connections which marked as deletable // all deletable connection events been sent - deletableConnections := make(map[string]bool, 0) + deletableConnections := make(map[string]bool) now := time.Now() c.connections.IterCb(func(key string, v interface{}) { con, ok := v.(*ConnectionInfo) @@ -632,6 +691,9 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() { } // already mark as deletable or process not monitoring shouldDelete := con.MarkDeletable || !c.ProcessIsMonitor(con.PID) + if !shouldDelete { + shouldDelete = !c.checkConnectionIsExist(con) + } if shouldDelete && con.DeleteAfter == nil { deleteAfterTime := now.Add(connectionDeleteDelayTime) diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go index 83e4ba4..6ecfd2f 100644 --- a/pkg/accesslog/events/data.go +++ b/pkg/accesslog/events/data.go @@ -36,6 +36,7 @@ type SocketDataUploadEvent struct { ConnectionID uint64 RandomID uint64 DataID0 uint64 + PrevDataID0 uint64 TotalSize0 uint64 Buffer [2048]byte } @@ -52,6 +53,7 @@ func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) { s.ConnectionID = r.ReadUint64() s.RandomID = r.ReadUint64() s.DataID0 = r.ReadUint64() + s.PrevDataID0 = r.ReadUint64() s.TotalSize0 = r.ReadUint64() r.ReadUint8Array(s.Buffer[:], 2048) } @@ -96,6 +98,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 { return s.DataID0 } +func (s *SocketDataUploadEvent) PrevDataID() uint64 { + return s.PrevDataID0 +} + func (s *SocketDataUploadEvent) DataSequence() int { return int(s.Sequence0) } diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go index 18577de..bc1a6ad 100644 --- a/pkg/accesslog/runner.go +++ b/pkg/accesslog/runner.go @@ -32,14 +32,11 @@ import ( "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/accesslog/events" "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" + "github.com/apache/skywalking-rover/pkg/accesslog/sender" "github.com/apache/skywalking-rover/pkg/core" "github.com/apache/skywalking-rover/pkg/core/backend" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" - "github.com/apache/skywalking-rover/pkg/process" - "github.com/apache/skywalking-rover/pkg/tools/host" - - v32 "skywalking.apache.org/repo/goapi/collect/common/v3" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) @@ -53,8 +50,8 @@ type Runner struct { mgr *module.Manager backendOp backend.Operator cluster string - alsClient v3.EBPFAccessLogServiceClient ctx context.Context + sender *sender.GRPCSender } func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) { @@ -70,17 +67,18 @@ func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) { backendOP := coreModule.BackendOperator() clusterName := coreModule.ClusterName() monitorFilter := common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","), strings.Split(config.ExcludeClusters, ",")) + connectionMgr := common.NewConnectionManager(config, mgr, bpfLoader, monitorFilter) runner := &Runner{ context: &common.AccessLogContext{ BPF: bpfLoader, Config: config, - ConnectionMgr: common.NewConnectionManager(config, mgr, bpfLoader, monitorFilter), + ConnectionMgr: connectionMgr, }, collectors: collector.Collectors(), mgr: mgr, backendOp: backendOP, cluster: clusterName, - alsClient: v3.NewEBPFAccessLogServiceClient(backendOP.GetConnection()), + sender: sender.NewGRPCSender(mgr, connectionMgr), } runner.context.Queue = common.NewQueue(config.Flush.MaxCountOneStream, flushDuration, runner) return runner, nil @@ -91,6 +89,7 @@ func (r *Runner) Start(ctx context.Context) error { r.context.RuntimeContext = ctx r.context.Queue.Start(ctx) r.context.ConnectionMgr.Start(ctx, r.context) + r.sender.Start(ctx) for _, c := range r.collectors { err := c.Start(r.mgr, r.context) if err != nil { @@ -111,26 +110,20 @@ func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan *common. return } - allLogs := r.buildConnectionLogs(kernels, protocols) - log.Debugf("ready to send access log, connection count: %d", len(allLogs)) - if len(allLogs) == 0 { - return - } - if err := r.sendLogs(allLogs); err != nil { - log.Warnf("send access log failure: %v", err) - } + batch := r.sender.NewBatch() + r.buildConnectionLogs(batch, kernels, protocols) + log.Debugf("ready to send access log, connection count: %d", batch.ConnectionCount()) + r.sender.AddBatch(batch) } -func (r *Runner) buildConnectionLogs(kernels chan *common.KernelLog, protocols chan *common.ProtocolLog) map[*common.ConnectionInfo]*connectionLogs { - result := make(map[*common.ConnectionInfo]*connectionLogs) - r.buildKernelLogs(kernels, result) - r.buildProtocolLogs(protocols, result) +func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan *common.KernelLog, protocols chan *common.ProtocolLog) { + r.buildKernelLogs(kernels, batch) + r.buildProtocolLogs(protocols, batch) r.context.ConnectionMgr.OnBuildConnectionLogFinished() - return result } -func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result map[*common.ConnectionInfo]*connectionLogs) { +func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch *sender.BatchLogs) { delayAppends := make([]*common.KernelLog, 0) for { select { @@ -139,13 +132,7 @@ func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result map[*com log.Debugf("building kernel log result, connetaion ID: %d, random ID: %d, exist connection: %t, delay: %t", kernelLog.Event.GetConnectionID(), kernelLog.Event.GetRandomID(), connection != nil, delay) if connection != nil && curLog != nil { - logs, exist := result[connection] - if !exist { - logs = newConnectionLogs() - result[connection] = logs - } - - logs.kernels = append(logs.kernels, curLog) + batch.AppendKernelLog(connection, curLog) } else if delay { delayAppends = append(delayAppends, kernelLog) } @@ -162,7 +149,7 @@ func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result map[*com } } -func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result map[*common.ConnectionInfo]*connectionLogs) { +func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch *sender.BatchLogs) { delayAppends := make([]*common.ProtocolLog, 0) for { select { @@ -178,15 +165,7 @@ func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result ma conID, randomID, connection != nil, delay) } if connection != nil && len(kernelLogs) > 0 && protocolLogs != nil { - logs, exist := result[connection] - if !exist { - logs = newConnectionLogs() - result[connection] = logs - } - logs.protocols = append(logs.protocols, &connectionProtocolLog{ - kernels: kernelLogs, - protocol: protocolLogs, - }) + batch.AppendProtocolLog(connection, kernelLogs, protocolLogs) } else if delay { delayAppends = append(delayAppends, protocolLog) } @@ -203,95 +182,6 @@ func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result ma } } -func (r *Runner) sendLogs(allLogs map[*common.ConnectionInfo]*connectionLogs) error { - timeout, cancelFunc := context.WithTimeout(r.ctx, time.Second*20) - defer cancelFunc() - streaming, err := r.alsClient.Collect(timeout) - if err != nil { - return err - } - - firstLog := true - firstConnection := true - for connection, logs := range allLogs { - if len(logs.kernels) == 0 && len(logs.protocols) == 0 { - continue - } - if log.Enable(logrus.DebugLevel) { - log.Debugf("ready to sending access log with connection, connection ID: %d, random ID: %d, "+ - "local: %s, remote: %s, role: %s, kernel logs count: %d, protocol log count: %d", - connection.ConnectionID, connection.RandomID, connection.RPCConnection.Local, connection.RPCConnection.Remote, - connection.RPCConnection.Role, len(logs.kernels), len(logs.protocols)) - } - - if len(logs.kernels) > 0 { - r.sendLogToTheStream(streaming, r.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, nil)) - firstLog, firstConnection = false, false - } - for _, protocolLog := range logs.protocols { - r.sendLogToTheStream(streaming, r.buildAccessLogMessage(firstLog, firstConnection, connection, protocolLog.kernels, protocolLog.protocol)) - firstLog, firstConnection = false, false - } - - firstConnection = true - } - - if _, err := streaming.CloseAndRecv(); err != nil { - log.Warnf("closing the access log streaming error: %v", err) - } - return nil -} - -func (r *Runner) sendLogToTheStream(streaming v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) { - if err := streaming.Send(logMsg); err != nil { - log.Warnf("send access log failure: %v", err) - } -} - -func (r *Runner) buildAccessLogMessage(firstLog, firstConnection bool, conn *common.ConnectionInfo, - kernelLogs []*v3.AccessLogKernelLog, protocolLog *v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage { - var rpcCon *v3.AccessLogConnection - if firstConnection { - rpcCon = conn.RPCConnection - } - return &v3.EBPFAccessLogMessage{ - Node: r.BuildNodeInfo(firstLog), - Connection: rpcCon, - KernelLogs: kernelLogs, - ProtocolLog: protocolLog, - } -} - -func (r *Runner) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo { - if !needs { - return nil - } - netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0) - for i, n := range host.AllNetworkInterfaces() { - netInterfaces = append(netInterfaces, &v3.EBPFAccessLogNodeNetInterface{ - Index: int32(i), - Mtu: int32(n.MTU), - Name: n.Name, - }) - } - return &v3.EBPFAccessLogNodeInfo{ - Name: r.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(), - NetInterfaces: netInterfaces, - BootTime: r.convertTimeToInstant(host.BootTime), - ClusterName: r.cluster, - Policy: &v3.EBPFAccessLogPolicy{ - ExcludeNamespaces: r.context.ConnectionMgr.GetExcludeNamespaces(), - }, - } -} - -func (r *Runner) convertTimeToInstant(t time.Time) *v32.Instant { - return &v32.Instant{ - Seconds: t.Unix(), - Nanos: int32(t.Nanosecond()), - } -} - func (r *Runner) shouldReportProcessLog(pid uint32) bool { // if the process not monitoring, then check the process is existed or not if r.context.ConnectionMgr.ProcessIsMonitor(pid) { @@ -364,20 +254,3 @@ func (r *Runner) Stop() error { r.context.ConnectionMgr.Stop() return nil } - -type connectionLogs struct { - kernels []*v3.AccessLogKernelLog - protocols []*connectionProtocolLog -} - -type connectionProtocolLog struct { - kernels []*v3.AccessLogKernelLog - protocol *v3.AccessLogProtocolLogs -} - -func newConnectionLogs() *connectionLogs { - return &connectionLogs{ - kernels: make([]*v3.AccessLogKernelLog, 0), - protocols: make([]*connectionProtocolLog, 0), - } -} diff --git a/pkg/accesslog/sender/logs.go b/pkg/accesslog/sender/logs.go new file mode 100644 index 0000000..95f2710 --- /dev/null +++ b/pkg/accesslog/sender/logs.go @@ -0,0 +1,107 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sender + +import ( + "github.com/apache/skywalking-rover/pkg/accesslog/common" + + v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" +) + +var maxLogsPerSend = 10_000 + +type BatchLogs struct { + logs map[*common.ConnectionInfo]*ConnectionLogs +} + +func newBatchLogs() *BatchLogs { + return &BatchLogs{ + logs: make(map[*common.ConnectionInfo]*ConnectionLogs), + } +} + +func (l *BatchLogs) ConnectionCount() int { + return len(l.logs) +} + +func (l *BatchLogs) AppendKernelLog(connection *common.ConnectionInfo, log *v3.AccessLogKernelLog) { + logs, ok := l.logs[connection] + if !ok { + logs = newConnectionLogs() + l.logs[connection] = logs + } + + logs.kernels = append(logs.kernels, log) +} + +func (l *BatchLogs) AppendProtocolLog(connection *common.ConnectionInfo, kernels []*v3.AccessLogKernelLog, protocols *v3.AccessLogProtocolLogs) { + logs, ok := l.logs[connection] + if !ok { + logs = newConnectionLogs() + l.logs[connection] = logs + } + + logs.protocols = append(logs.protocols, &ConnectionProtocolLog{ + kernels: kernels, + protocol: protocols, + }) +} + +func (l *BatchLogs) splitBatchLogs() []*BatchLogs { + logsCount := len(l.logs) + if logsCount == 0 { + return nil + } + splitCount := logsCount / maxLogsPerSend + if logsCount%maxLogsPerSend != 0 { + splitCount++ + } + result := make([]*BatchLogs, 0, splitCount) + + // split the connections by maxLogsPerSend + currentCount := 0 + var currentBatch *BatchLogs + for connection, logs := range l.logs { + if currentCount%maxLogsPerSend == 0 { + currentBatch = newBatchLogs() + result = append(result, currentBatch) + currentCount = 0 + } + currentBatch.logs[connection] = logs + currentCount++ + } + + return result +} + +type ConnectionLogs struct { + kernels []*v3.AccessLogKernelLog + protocols []*ConnectionProtocolLog +} + +type ConnectionProtocolLog struct { + kernels []*v3.AccessLogKernelLog + protocol *v3.AccessLogProtocolLogs +} + +func newConnectionLogs() *ConnectionLogs { + return &ConnectionLogs{ + kernels: make([]*v3.AccessLogKernelLog, 0), + protocols: make([]*ConnectionProtocolLog, 0), + } +} diff --git a/pkg/accesslog/sender/sender.go b/pkg/accesslog/sender/sender.go new file mode 100644 index 0000000..ad6e921 --- /dev/null +++ b/pkg/accesslog/sender/sender.go @@ -0,0 +1,234 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sender + +import ( + "container/list" + "context" + "fmt" + "sync" + "time" + + "github.com/apache/skywalking-rover/pkg/accesslog/common" + "github.com/apache/skywalking-rover/pkg/core" + "github.com/apache/skywalking-rover/pkg/logger" + "github.com/apache/skywalking-rover/pkg/module" + "github.com/apache/skywalking-rover/pkg/process" + "github.com/apache/skywalking-rover/pkg/tools/host" + + "github.com/sirupsen/logrus" + + v32 "skywalking.apache.org/repo/goapi/collect/common/v3" + v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" +) + +var log = logger.GetLogger("accesslog", "sender") + +// GRPCSender Async to sending the access log to the backend +type GRPCSender struct { + logs *list.List + notify chan bool + mutex sync.Mutex + ctx context.Context + + mgr *module.Manager + connectionMgr *common.ConnectionManager + alsClient v3.EBPFAccessLogServiceClient + clusterName string +} + +// NewGRPCSender creates a new GRPCSender +func NewGRPCSender(mgr *module.Manager, connectionMgr *common.ConnectionManager) *GRPCSender { + return &GRPCSender{ + logs: list.New(), + notify: make(chan bool, 1), + mgr: mgr, + connectionMgr: connectionMgr, + clusterName: mgr.FindModule(core.ModuleName).(core.Operator).ClusterName(), + alsClient: v3.NewEBPFAccessLogServiceClient(mgr.FindModule(core.ModuleName).(core.Operator). + BackendOperator().GetConnection()), + } +} + +func (g *GRPCSender) Start(ctx context.Context) { + g.ctx = ctx + go func() { + for { + select { + case <-g.notify: + if count, err := g.handleLogs(); err != nil { + log.Warnf("sending access log error, lost %d logs, error: %v", count, err) + } + case <-ctx.Done(): + return + } + } + }() +} + +func (g *GRPCSender) NewBatch() *BatchLogs { + return &BatchLogs{ + logs: make(map[*common.ConnectionInfo]*ConnectionLogs), + } +} + +func (g *GRPCSender) AddBatch(batch *BatchLogs) { + // split logs + splitLogs := batch.splitBatchLogs() + + // append the resend logs + g.mutex.Lock() + defer g.mutex.Unlock() + for _, l := range splitLogs { + g.logs.PushBack(l) + } + + // notify the sender + select { + case g.notify <- true: + default: + } +} + +func (g *GRPCSender) handleLogs() (int, error) { + for { + // pop logs + logs := g.popLogs() + if logs == nil { + return 0, nil + } + // send logs + now := time.Now() + if err := g.sendLogs(logs); err != nil { + return len(logs.logs), err + } + log.Infof("sending access log success, connection count: %d, use time: %s", + logs.ConnectionCount(), time.Since(now).String()) + } +} + +func (g *GRPCSender) sendLogs(batch *BatchLogs) error { + timeout, cancelFunc := context.WithTimeout(g.ctx, time.Second*20) + defer cancelFunc() + streaming, err := g.alsClient.Collect(timeout) + if err != nil { + return err + } + + firstLog := true + firstConnection := true + var sendError error + for connection, logs := range batch.logs { + if len(logs.kernels) == 0 && len(logs.protocols) == 0 { + continue + } + if log.Enable(logrus.DebugLevel) { + log.Debugf("ready to sending access log with connection, connection ID: %d, random ID: %d, "+ + "local: %s, remote: %s, role: %s, contains ztunnel address: %t, kernel logs count: %d, protocol log count: %d", + connection.ConnectionID, connection.RandomID, connection.RPCConnection.Local, connection.RPCConnection.Remote, + connection.RPCConnection.Role, connection.RPCConnection.Attachment != nil, len(logs.kernels), len(logs.protocols)) + } + + if len(logs.kernels) > 0 { + sendError = g.sendLogToTheStream(streaming, g.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, nil)) + firstLog, firstConnection = false, false + } + for _, protocolLog := range logs.protocols { + sendError = g.sendLogToTheStream(streaming, g.buildAccessLogMessage(firstLog, firstConnection, connection, protocolLog.kernels, protocolLog.protocol)) + firstLog, firstConnection = false, false + } + if sendError != nil { + g.closeStream(streaming) + return fmt.Errorf("sending access log error: %v", sendError) + } + + firstConnection = true + } + + g.closeStream(streaming) + return nil +} + +func (g *GRPCSender) closeStream(s v3.EBPFAccessLogService_CollectClient) { + if _, err := s.CloseAndRecv(); err != nil { + log.Warnf("closing the access log streaming error: %v", err) + } +} + +func (g *GRPCSender) sendLogToTheStream(streaming v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) error { + if err := streaming.Send(logMsg); err != nil { + return err + } + return nil +} + +func (g *GRPCSender) buildAccessLogMessage(firstLog, firstConnection bool, conn *common.ConnectionInfo, + kernelLogs []*v3.AccessLogKernelLog, protocolLog *v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage { + var rpcCon *v3.AccessLogConnection + if firstConnection { + rpcCon = conn.RPCConnection + } + return &v3.EBPFAccessLogMessage{ + Node: g.BuildNodeInfo(firstLog), + Connection: rpcCon, + KernelLogs: kernelLogs, + ProtocolLog: protocolLog, + } +} + +func (g *GRPCSender) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo { + if !needs { + return nil + } + netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0) + for i, n := range host.AllNetworkInterfaces() { + netInterfaces = append(netInterfaces, &v3.EBPFAccessLogNodeNetInterface{ + Index: int32(i), + Mtu: int32(n.MTU), + Name: n.Name, + }) + } + return &v3.EBPFAccessLogNodeInfo{ + Name: g.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(), + NetInterfaces: netInterfaces, + BootTime: g.convertTimeToInstant(host.BootTime), + ClusterName: g.clusterName, + Policy: &v3.EBPFAccessLogPolicy{ + ExcludeNamespaces: g.connectionMgr.GetExcludeNamespaces(), + }, + } +} + +func (g *GRPCSender) convertTimeToInstant(t time.Time) *v32.Instant { + return &v32.Instant{ + Seconds: t.Unix(), + Nanos: int32(t.Nanosecond()), + } +} + +func (g *GRPCSender) popLogs() *BatchLogs { + g.mutex.Lock() + defer g.mutex.Unlock() + if g.logs.Len() == 0 { + return nil + } + e := g.logs.Front() + logs := e.Value.(*BatchLogs) + g.logs.Remove(e) + return logs +} diff --git a/pkg/logger/settings.go b/pkg/logger/settings.go index a56c6ab..7705bb9 100644 --- a/pkg/logger/settings.go +++ b/pkg/logger/settings.go @@ -17,7 +17,9 @@ package logger -import "github.com/sirupsen/logrus" +import ( + "github.com/sirupsen/logrus" +) const ( DefaultLoggerLevel = logrus.InfoLevel diff --git a/pkg/profiling/task/network/analyze/events/data.go b/pkg/profiling/task/network/analyze/events/data.go index 1f71fa8..bc451fc 100644 --- a/pkg/profiling/task/network/analyze/events/data.go +++ b/pkg/profiling/task/network/analyze/events/data.go @@ -35,6 +35,7 @@ type SocketDataUploadEvent struct { ConnectionID uint64 RandomID uint64 DataID0 uint64 + PrevDataID0 uint64 TotalSize0 uint64 Buffer [2048]byte } @@ -79,6 +80,10 @@ func (s *SocketDataUploadEvent) DataID() uint64 { return s.DataID0 } +func (s *SocketDataUploadEvent) PrevDataID() uint64 { + return s.PrevDataID0 +} + func (s *SocketDataUploadEvent) DataSequence() int { return int(s.Sequence0) } diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go index 707aaa8..511a088 100644 --- a/pkg/profiling/task/network/analyze/layer7/events.go +++ b/pkg/profiling/task/network/analyze/layer7/events.go @@ -29,7 +29,7 @@ import ( ) func (l *Listener) initSocketDataQueue(parallels, queueSize int, config *profiling.TaskConfig) { - l.socketDataQueue = btf.NewEventQueue(parallels, queueSize, func(num int) btf.PartitionContext { + l.socketDataQueue = btf.NewEventQueue("socket data resolver", parallels, queueSize, func(num int) btf.PartitionContext { return NewSocketDataPartitionContext(l, config) }) } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go index 7224727..f966743 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go @@ -57,6 +57,13 @@ func (r *Request) MinDataID() int { return int(r.headerBuffer.FirstSocketBuffer().DataID()) } +func (r *Request) MaxDataID() int { + if r.bodyBuffer != nil { + return int(r.bodyBuffer.LastSocketBuffer().DataID()) + } + return int(r.headerBuffer.LastSocketBuffer().DataID()) +} + func (r *Request) Original() *http.Request { return r.original } diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go index 307f29f..c220664 100644 --- a/pkg/tools/btf/ebpf.go +++ b/pkg/tools/btf/ebpf.go @@ -62,7 +62,6 @@ func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) *ebpf.Collecti spec = readSpec }) - enhanceDataQueueOpts(bpfSpec) return &ebpf.CollectionOptions{Programs: ebpf.ProgramOptions{KernelTypes: spec}} } diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go index 867d84e..bdbd7a6 100644 --- a/pkg/tools/btf/linker.go +++ b/pkg/tools/btf/linker.go @@ -83,6 +83,7 @@ type Linker struct { closeOnce sync.Once linkedUProbes map[string]bool + linkMutex sync.Mutex } func NewLinker() *Linker { @@ -136,6 +137,8 @@ func (m *Linker) AddSysCallWithKProbe(call string, linkK LinkFunc, p *ebpf.Progr if p == nil { return } + m.linkMutex.Lock() + defer m.linkMutex.Unlock() kprobe, err := linkK(syscallPrefix+call, p, nil) if err != nil { @@ -147,10 +150,13 @@ func (m *Linker) AddSysCallWithKProbe(call string, linkK LinkFunc, p *ebpf.Progr } func (m *Linker) AddTracePoint(sys, name string, p *ebpf.Program) { + m.linkMutex.Lock() + defer m.linkMutex.Unlock() l, e := link.Tracepoint(sys, name, p, nil) if e != nil { m.errors = multierror.Append(m.errors, fmt.Errorf("open %s error: %v", name, e)) } else { + log.Debugf("attach to the tracepoint: sys: %s, name: %s", sys, name) m.closers = append(m.closers, l) } } @@ -264,6 +270,8 @@ func (u *UProbeExeFile) AddLinkWithType(symbol string, enter bool, p *ebpf.Progr } func (u *UProbeExeFile) addLinkWithType0(symbol string, enter bool, p *ebpf.Program, customizeAddress uint64) (link.Link, error) { + u.linker.linkMutex.Lock() + defer u.linker.linkMutex.Unlock() // check already linked uprobeIdentity := fmt.Sprintf("%s_%s_%t_%d", u.addr, symbol, enter, customizeAddress) if u.linker.linkedUProbes[uprobeIdentity] { diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go index 290f717..c5ce07b 100644 --- a/pkg/tools/btf/queue.go +++ b/pkg/tools/btf/queue.go @@ -21,9 +21,8 @@ import ( "context" "fmt" "hash/fnv" - "os" - "strings" "sync" + "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/perf" @@ -32,50 +31,9 @@ import ( const dataQueuePrefix = "rover_data_queue_" -var ( - ringbufChecker sync.Once - ringbufAvailable bool -) - -func isRingbufAvailable() bool { - ringbufChecker.Do(func() { - buf, err := ebpf.NewMap(&ebpf.MapSpec{ - Type: ebpf.RingBuf, - MaxEntries: uint32(os.Getpagesize()), - }) - - buf.Close() - - ringbufAvailable = err == nil - - if ringbufAvailable { - log.Infof("detect the ring buffer is available in current system for enhancement of data queue") - } - }) - - return ringbufAvailable -} - -func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) { - it := bpfSpec.Types.Iterate() - for it.Next() { - if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) { - continue - } - if err := validateGlobalConstVoidPtrVar(it.Type); err != nil { - panic(fmt.Errorf("invalid global const void ptr var %s: %v", it.Type.TypeName(), err)) - } - - // if the ringbuf not available, use perf event array - if !isRingbufAvailable() { - mapName := strings.TrimPrefix(it.Type.TypeName(), dataQueuePrefix) - mapSpec := bpfSpec.Maps[mapName] - mapSpec.Type = ebpf.PerfEventArray - mapSpec.KeySize = 4 - mapSpec.ValueSize = 4 - } - } -} +// queueChannelReducingCountCheckInterval is the interval to check the queue channel reducing count +// if the reducing count is almost full, then added a warning log +const queueChannelReducingCountCheckInterval = time.Second * 5 type queueReader interface { Read() ([]byte, error) @@ -125,6 +83,7 @@ func (p *perfQueueReader) Close() error { type ringBufReader struct { reader *ringbuf.Reader + name string } func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) { @@ -132,7 +91,7 @@ func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) { if err != nil { return nil, err } - return &ringBufReader{reader: reader}, nil + return &ringBufReader{reader: reader, name: emap.String()}, nil } func (r *ringBufReader) Read() ([]byte, error) { @@ -153,6 +112,7 @@ type PartitionContext interface { } type EventQueue struct { + name string count int receivers []*mapReceiver partitions []*partition @@ -168,12 +128,12 @@ type mapReceiver struct { parallels int } -func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator func(partitionNum int) PartitionContext) *EventQueue { +func NewEventQueue(name string, partitionCount, sizePerPartition int, contextGenerator func(partitionNum int) PartitionContext) *EventQueue { partitions := make([]*partition, 0) for i := 0; i < partitionCount; i++ { partitions = append(partitions, newPartition(i, sizePerPartition, contextGenerator(i))) } - return &EventQueue{count: partitionCount, partitions: partitions} + return &EventQueue{name: name, count: partitionCount, partitions: partitions} } func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, parallels int, dataSupplier func() interface{}, @@ -236,6 +196,27 @@ func (e *EventQueue) start0(ctx context.Context, linker *Linker) { } }(ctx, i) } + + // channel reducing count check + go func() { + ticker := time.NewTicker(queueChannelReducingCountCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + for _, p := range e.partitions { + reducingCount := len(p.channel) + if reducingCount > cap(p.channel)*9/10 { + log.Warnf("queue %s partition %d reducing count is almost full, "+ + "please trying to increase the parallels count or queue size, status: %d/%d", + e.name, p.index, reducingCount, cap(p.channel)) + } + } + case <-ctx.Done(): + return + } + } + }() } func (e *EventQueue) routerTransformer(data interface{}, routeGenerator func(data interface{}) string) { diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index c16a9fd..adf35ff 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -20,6 +20,7 @@ package buffer import ( "container/list" "errors" + "fmt" "io" "sync" "time" @@ -55,6 +56,8 @@ type SocketDataBuffer interface { BufferLen() int // DataID data id of the buffer DataID() uint64 + // PrevDataID the previous data id of the buffer + PrevDataID() uint64 // DataSequence the data sequence under same data id DataSequence() int // IsStart this buffer is start of the same data id @@ -83,11 +86,19 @@ type DataIDRange struct { IsToBufferReadFinished bool } +func (r *DataIDRange) String() string { + return fmt.Sprintf("from: %d, to: %d, isToBufferReadFinished: %t", r.From, r.To, r.IsToBufferReadFinished) +} + type Buffer struct { dataEvents *list.List detailEvents *list.List validated bool // the events list is validated or not + // shouldResetPosition means the buffer have new buffer appended into the buffer + // so the position should reset to the head, and re-read the buffer + shouldResetPosition bool + eventLocker sync.RWMutex head *Position @@ -184,6 +195,10 @@ func (p *Position) DataID() uint64 { return p.element.Value.(SocketDataBuffer).DataID() } +func (p *Position) PrevDataID() uint64 { + return p.element.Value.(SocketDataBuffer).PrevDataID() +} + func (p *Position) Seq() int { return p.element.Value.(SocketDataBuffer).DataSequence() } @@ -609,6 +624,12 @@ func (r *Buffer) PrepareForReading() bool { if r.dataEvents.Len() == 0 { return false } + // if the buffer should reset, then reset the position and cannot be read from current position + if r.shouldResetPosition { + r.ResetForLoopReading() + r.shouldResetPosition = false + return false + } if r.head == nil || r.head.element == nil { // read in the first element r.eventLocker.RLock() @@ -741,8 +762,16 @@ func (r *Buffer) AppendDataEvent(event SocketDataBuffer) { r.eventLocker.Lock() defer r.eventLocker.Unlock() + defer func() { + // if the current position is not nil and the current reading data id is bigger than the event data id + if r.current != nil && r.current.DataID() > event.DataID() { + r.shouldResetPosition = true + } + }() + if r.dataEvents.Len() == 0 { r.dataEvents.PushFront(event) + r.shouldResetPosition = true return } if r.dataEvents.Back().Value.(SocketDataBuffer).DataID() < event.DataID() { @@ -802,6 +831,8 @@ func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int { } func (r *Buffer) DataLength() int { + r.eventLocker.RLock() + defer r.eventLocker.RUnlock() if r.dataEvents == nil { return 0 } diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go index b1357cd..fff1ec2 100644 --- a/pkg/tools/ip/conntrack.go +++ b/pkg/tools/ip/conntrack.go @@ -18,11 +18,10 @@ package ip import ( + "github.com/apache/skywalking-rover/pkg/logger" "net" "syscall" - "github.com/apache/skywalking-rover/pkg/logger" - "github.com/florianl/go-conntrack" "golang.org/x/sys/unix" @@ -36,9 +35,6 @@ var numberStrategies = []struct { }{{ name: "tcp", proto: syscall.IPPROTO_TCP, -}, { - name: "udp", - proto: syscall.IPPROTO_UDP, }} type ConnTrack struct { @@ -77,19 +73,6 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) bool { return true } } - - // using dump to query protocol - dump, e := c.tracker.Dump(conntrack.Conntrack, family) - if e != nil { - log.Debug("cannot dump the conntrack session, error: ", e) - return false - } - if res := c.filterValidateReply(dump, tuple); res != nil { - addr.DestIP = res.Src.String() - log.Debugf("found the connection from the dump all conntrack, src: %s:%d, dst: %s:%d, proto number: %d", - tuple.Src, *tuple.Proto.SrcPort, tuple.Dst, *tuple.Proto.DstPort, *tuple.Proto.Number) - return true - } return false }
