This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/tmp_disable_reading by this 
push:
     new 26786d0  add queue full warning log
26786d0 is described below

commit 26786d0b95f0f2f3226de865ec61e1fdd72dcd09
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 21 20:59:47 2024 +0800

    add queue full warning log
---
 pkg/accesslog/collector/connection.go              |  2 +-
 pkg/accesslog/collector/protocols/queue.go         |  3 +-
 .../task/network/analyze/layer7/events.go          |  2 +-
 pkg/tools/btf/queue.go                             | 37 +++++++++++++++++-----
 4 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index a5565a1..d861447 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        if err != nil {
                connectionLogger.Warnf("cannot create the connection tracker, 
%v", err)
        }
-       c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
+       c.eventQueue = btf.NewEventQueue("connection resolver", 
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
                ctx.Config.ConnectionAnalyze.QueueSize, func(num int) 
btf.PartitionContext {
                        return newConnectionPartitionContext(ctx, track)
                })
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 3ae9b6e..6a65d5e 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -89,7 +89,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
 }
 
 func (q *AnalyzeQueue) Start(ctx context.Context) {
-       q.eventQueue = 
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
+       q.eventQueue = btf.NewEventQueue("socket data analyzer",
+               q.context.Config.ProtocolAnalyze.AnalyzeParallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
                func(num int) btf.PartitionContext {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 707aaa8..511a088 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -29,7 +29,7 @@ import (
 )
 
 func (l *Listener) initSocketDataQueue(parallels, queueSize int, config 
*profiling.TaskConfig) {
-       l.socketDataQueue = btf.NewEventQueue(parallels, queueSize, func(num 
int) btf.PartitionContext {
+       l.socketDataQueue = btf.NewEventQueue("socket data resolver", 
parallels, queueSize, func(num int) btf.PartitionContext {
                return NewSocketDataPartitionContext(l, config)
        })
 }
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 3846b29..71d187f 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -24,6 +24,7 @@ import (
        "os"
        "strings"
        "sync"
+       "time"
 
        "github.com/cilium/ebpf"
        "github.com/cilium/ebpf/perf"
@@ -32,6 +33,10 @@ import (
 
 const dataQueuePrefix = "rover_data_queue_"
 
+// queueChannelReducingCountCheckInterval is the interval to check the queue 
channel reducing count
+// if the reducing count is almost full, then added a warning log
+const queueChannelReducingCountCheckInterval = time.Second * 5
+
 var (
        ringbufChecker   sync.Once
        ringbufAvailable bool
@@ -155,6 +160,7 @@ type PartitionContext interface {
 }
 
 type EventQueue struct {
+       name       string
        count      int
        receivers  []*mapReceiver
        partitions []*partition
@@ -170,12 +176,12 @@ type mapReceiver struct {
        parallels    int
 }
 
-func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator 
func(partitionNum int) PartitionContext) *EventQueue {
+func NewEventQueue(name string, partitionCount, sizePerPartition int, 
contextGenerator func(partitionNum int) PartitionContext) *EventQueue {
        partitions := make([]*partition, 0)
        for i := 0; i < partitionCount; i++ {
                partitions = append(partitions, newPartition(i, 
sizePerPartition, contextGenerator(i)))
        }
-       return &EventQueue{count: partitionCount, partitions: partitions}
+       return &EventQueue{name: name, count: partitionCount, partitions: 
partitions}
 }
 
 func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
@@ -226,17 +232,11 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
                go func(ctx context.Context, inx int) {
                        p := e.partitions[inx]
                        p.ctx.Start(ctx)
-                       var t = 0
                        for {
                                select {
                                // consume the data
                                case data := <-p.channel:
                                        p.ctx.Consume(data)
-                                       t++
-                                       if t%1000 == 0 {
-                                               log.Infof("reducing count in 
partion: %d, %d", p.index, len(p.channel))
-                                               t = 0
-                                       }
                                // shutdown the consumer
                                case <-ctx.Done():
                                        return
@@ -244,6 +244,27 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
                        }
                }(ctx, i)
        }
+
+       // channel reducing count check
+       go func() {
+               ticker := time.NewTicker(queueChannelReducingCountCheckInterval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-ticker.C:
+                               for _, p := range e.partitions {
+                                       reducingCount := len(p.channel)
+                                       if reducingCount > cap(p.channel)*9/10 {
+                                               log.Warnf("queue %s partition 
%d reducing count is almost full, "+
+                                                       "please trying to 
increase the parallels count or queue size, status: %d/%d",
+                                                       e.name, p.index, 
reducingCount, cap(p.channel))
+                                       }
+                               }
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
 }
 
 func (e *EventQueue) routerTransformer(data interface{}, routeGenerator 
func(data interface{}) string) {

Reply via email to