This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/tmp_disable_reading by this 
push:
     new 6be42e2  add test log
6be42e2 is described below

commit 6be42e2e47281fe903b572f8dcaf7c18c7fe745d
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 19 19:37:35 2024 +0800

    add test log
---
 pkg/tools/btf/queue.go | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index c02de3f..5a16e51 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -24,6 +24,8 @@ import (
        "os"
        "strings"
        "sync"
+       "sync/atomic"
+       "time"
 
        "github.com/cilium/ebpf"
        "github.com/cilium/ebpf/perf"
@@ -174,6 +176,19 @@ func NewEventQueue(partitionCount, sizePerPartition int, 
contextGenerator func(p
        for i := 0; i < partitionCount; i++ {
                partitions = append(partitions, newPartition(i, 
sizePerPartition, contextGenerator(i)))
        }
+
+       go func() {
+               ticker := time.NewTicker(time.Second * 5)
+               for {
+                       select {
+                       case <-ticker.C:
+                               for _, p := range partitions {
+                                       log.Infof("+++partition %d, count: %d", 
p.index, atomic.LoadInt64(p.count))
+                                       atomic.StoreInt64(p.count, 0)
+                               }
+                       }
+               }
+       }()
        return &EventQueue{count: partitionCount, partitions: partitions}
 }
 
@@ -202,6 +217,7 @@ func (e *EventQueue) Push(key string, data interface{}) {
 
        // append data
        e.partitions[sum32%e.count].channel <- data
+       atomic.AddInt64(e.partitions[sum32%e.count].count, 1)
 }
 
 func (e *EventQueue) PartitionContexts() []PartitionContext {
@@ -254,12 +270,15 @@ type partition struct {
        index   int
        channel chan interface{}
        ctx     PartitionContext
+       count   *int64
 }
 
 func newPartition(index, size int, ctx PartitionContext) *partition {
+       i := int64(0)
        return &partition{
                index:   index,
                channel: make(chan interface{}, size),
                ctx:     ctx,
+               count:   &i,
        }
 }

Reply via email to