This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/tmp_disable_reading by this
push:
new 6be42e2 add test log
6be42e2 is described below
commit 6be42e2e47281fe903b572f8dcaf7c18c7fe745d
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 19 19:37:35 2024 +0800
add test log
---
pkg/tools/btf/queue.go | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index c02de3f..5a16e51 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -24,6 +24,8 @@ import (
"os"
"strings"
"sync"
+ "sync/atomic"
+ "time"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
@@ -174,6 +176,19 @@ func NewEventQueue(partitionCount, sizePerPartition int,
contextGenerator func(p
for i := 0; i < partitionCount; i++ {
partitions = append(partitions, newPartition(i,
sizePerPartition, contextGenerator(i)))
}
+
+ go func() {
+ ticker := time.NewTicker(time.Second * 5)
+ for {
+ select {
+ case <-ticker.C:
+ for _, p := range partitions {
+ log.Infof("+++partition %d, count: %d",
p.index, atomic.LoadInt64(p.count))
+ atomic.StoreInt64(p.count, 0)
+ }
+ }
+ }
+ }()
return &EventQueue{count: partitionCount, partitions: partitions}
}
@@ -202,6 +217,7 @@ func (e *EventQueue) Push(key string, data interface{}) {
// append data
e.partitions[sum32%e.count].channel <- data
+ atomic.AddInt64(e.partitions[sum32%e.count].count, 1)
}
func (e *EventQueue) PartitionContexts() []PartitionContext {
@@ -254,12 +270,15 @@ type partition struct {
index int
channel chan interface{}
ctx PartitionContext
+ count *int64
}
func newPartition(index, size int, ctx PartitionContext) *partition {
+ i := int64(0)
return &partition{
index: index,
channel: make(chan interface{}, size),
ctx: ctx,
+ count: &i,
}
}