This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/tmp_disable_reading by this
push:
new 26786d0 add queue full warning log
26786d0 is described below
commit 26786d0b95f0f2f3226de865ec61e1fdd72dcd09
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 21 20:59:47 2024 +0800
add queue full warning log
---
pkg/accesslog/collector/connection.go | 2 +-
pkg/accesslog/collector/protocols/queue.go | 3 +-
.../task/network/analyze/layer7/events.go | 2 +-
pkg/tools/btf/queue.go | 37 +++++++++++++++++-----
4 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index a5565a1..d861447 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker,
%v", err)
}
- c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
+ c.eventQueue = btf.NewEventQueue("connection resolver",
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int)
btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 3ae9b6e..6a65d5e 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -89,7 +89,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
}
func (q *AnalyzeQueue) Start(ctx context.Context) {
- q.eventQueue =
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels,
q.context.Config.ProtocolAnalyze.QueueSize,
+ q.eventQueue = btf.NewEventQueue("socket data analyzer",
+ q.context.Config.ProtocolAnalyze.AnalyzeParallels,
q.context.Config.ProtocolAnalyze.QueueSize,
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 707aaa8..511a088 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -29,7 +29,7 @@ import (
)
func (l *Listener) initSocketDataQueue(parallels, queueSize int, config
*profiling.TaskConfig) {
- l.socketDataQueue = btf.NewEventQueue(parallels, queueSize, func(num
int) btf.PartitionContext {
+ l.socketDataQueue = btf.NewEventQueue("socket data resolver",
parallels, queueSize, func(num int) btf.PartitionContext {
return NewSocketDataPartitionContext(l, config)
})
}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 3846b29..71d187f 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -24,6 +24,7 @@ import (
"os"
"strings"
"sync"
+ "time"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
@@ -32,6 +33,10 @@ import (
const dataQueuePrefix = "rover_data_queue_"
+// queueChannelReducingCountCheckInterval is the interval to check the queue
channel reducing count
+// if the reducing count is almost full, then added a warning log
+const queueChannelReducingCountCheckInterval = time.Second * 5
+
var (
ringbufChecker sync.Once
ringbufAvailable bool
@@ -155,6 +160,7 @@ type PartitionContext interface {
}
type EventQueue struct {
+ name string
count int
receivers []*mapReceiver
partitions []*partition
@@ -170,12 +176,12 @@ type mapReceiver struct {
parallels int
}
-func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator
func(partitionNum int) PartitionContext) *EventQueue {
+func NewEventQueue(name string, partitionCount, sizePerPartition int,
contextGenerator func(partitionNum int) PartitionContext) *EventQueue {
partitions := make([]*partition, 0)
for i := 0; i < partitionCount; i++ {
partitions = append(partitions, newPartition(i,
sizePerPartition, contextGenerator(i)))
}
- return &EventQueue{count: partitionCount, partitions: partitions}
+ return &EventQueue{name: name, count: partitionCount, partitions:
partitions}
}
func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
@@ -226,17 +232,11 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
go func(ctx context.Context, inx int) {
p := e.partitions[inx]
p.ctx.Start(ctx)
- var t = 0
for {
select {
// consume the data
case data := <-p.channel:
p.ctx.Consume(data)
- t++
- if t%1000 == 0 {
- log.Infof("reducing count in
partion: %d, %d", p.index, len(p.channel))
- t = 0
- }
// shutdown the consumer
case <-ctx.Done():
return
@@ -244,6 +244,27 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
}
}(ctx, i)
}
+
+ // channel reducing count check
+ go func() {
+ ticker := time.NewTicker(queueChannelReducingCountCheckInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ for _, p := range e.partitions {
+ reducingCount := len(p.channel)
+ if reducingCount > cap(p.channel)*9/10 {
+ log.Warnf("queue %s partition
%d reducing count is almost full, "+
+ "please trying to
increase the parallels count or queue size, status: %d/%d",
+ e.name, p.index,
reducingCount, cap(p.channel))
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
}
func (e *EventQueue) routerTransformer(data interface{}, routeGenerator
func(data interface{}) string) {