This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch perf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/perf by this push:
     new f2318bf  reducing partition calc
f2318bf is described below

commit f2318bf10e4317390fc0b9881fe770337dcc395b
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 16:02:05 2024 +0800

    reducing partition calc
---
 pkg/accesslog/collector/connection.go              |  8 ++--
 pkg/accesslog/collector/protocols/queue.go         |  8 ++--
 .../task/network/analyze/layer7/events.go          |  8 ++--
 pkg/tools/btf/linker.go                            | 49 +++++++---------------
 pkg/tools/btf/queue.go                             | 18 +++-----
 5 files changed, 34 insertions(+), 57 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index d861447..4fc6823 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -84,13 +84,13 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
                ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
                        return &events.SocketConnectEvent{}
-               }, func(data interface{}) string {
-                       return fmt.Sprintf("%d", 
data.(*events.SocketConnectEvent).ConID)
+               }, func(data interface{}) int {
+                       return int(data.(*events.SocketConnectEvent).ConID)
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() 
interface{} {
                return &events.SocketCloseEvent{}
-       }, func(data interface{}) string {
-               return fmt.Sprintf("%d", 
data.(*events.SocketCloseEvent).ConnectionID)
+       }, func(data interface{}) int {
+               return int(data.(*events.SocketCloseEvent).ConnectionID)
        })
        c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
 
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index c9c2fdf..a4577f7 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -97,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return q.detailSupplier()
-               }, func(data interface{}) string {
-                       return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
+               }, func(data interface{}) int {
+                       return int(data.(events.SocketDetail).GetConnectionID())
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return &events.SocketDataUploadEvent{}
-               }, func(data interface{}) string {
-                       return fmt.Sprintf("%d", 
data.(*events.SocketDataUploadEvent).ConnectionID)
+               }, func(data interface{}) int {
+                       return 
int(data.(*events.SocketDataUploadEvent).ConnectionID)
                })
 
        q.eventQueue.Start(ctx, q.context.BPF.Linker)
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 511a088..1c01084 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -38,15 +38,15 @@ func (l *Listener) startSocketData(ctx context.Context, 
bpfLoader *bpf.Loader) {
        // socket buffer data
        l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDataUploadEvent{}
-       }, func(data interface{}) string {
-               return 
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
+       }, func(data interface{}) int {
+               return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
        })
 
        // socket detail
        l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDetailEvent{}
-       }, func(data interface{}) string {
-               return 
data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
+       }, func(data interface{}) int {
+               return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
        })
 
        l.socketDataQueue.Start(ctx, bpfLoader.Linker)
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 524a193..bdbd7a6 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -198,41 +198,24 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap 
*ebpf.Map, dataSupplier fun
                                continue
                        }
 
-                       go m.handleReadEvent(sample, dataSupplier, bufReader)
-                       //data := dataSupplier()
-                       //if r, ok := data.(reader.EventReader); ok {
-                       //      sampleReader := reader.NewReader(sample)
-                       //      r.ReadFrom(sampleReader)
-                       //      if readErr := sampleReader.HasError(); readErr 
!= nil {
-                       //              log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
-                       //              continue
-                       //      }
-                       //} else {
-                       //      if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
-                       //              log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
-                       //              continue
-                       //      }
-                       //}
-                       //
-                       //bufReader(data)
-               }
-       }()
-}
+                       data := dataSupplier()
+                       if r, ok := data.(reader.EventReader); ok {
+                               sampleReader := reader.NewReader(sample)
+                               r.ReadFrom(sampleReader)
+                               if readErr := sampleReader.HasError(); readErr 
!= nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                                       continue
+                               }
+                       } else {
+                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                                       continue
+                               }
+                       }
 
-func (m *Linker) handleReadEvent(sample []byte, dataSupplier func() 
interface{}, bufReader RingBufferReader) {
-       data := dataSupplier()
-       if r, ok := data.(reader.EventReader); ok {
-               sampleReader := reader.NewReader(sample)
-               r.ReadFrom(sampleReader)
-               if readErr := sampleReader.HasError(); readErr != nil {
-                       log.Warnf("parsing data from ringbuffer error: %v", 
readErr)
-               }
-       } else {
-               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
-                       log.Warnf("parsing data from ringbuffer error: %v", err)
+                       bufReader(data)
                }
-       }
-       bufReader(data)
+       }()
 }
 
 func (m *Linker) OpenUProbeExeFile(path string) *UProbeExeFile {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 9dd8ed1..fdd8754 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,7 +20,6 @@ package btf
 import (
        "context"
        "fmt"
-       "hash/fnv"
        "sync"
        "time"
 
@@ -122,7 +121,7 @@ type mapReceiver struct {
        emap         *ebpf.Map
        perCPUBuffer int
        dataSupplier func() interface{}
-       router       func(data interface{}) string
+       router       func(data interface{}) int
        parallels    int
 }
 
@@ -135,7 +134,7 @@ func NewEventQueue(name string, partitionCount, 
sizePerPartition int, contextGen
 }
 
 func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
-       routeGenerator func(data interface{}) string) {
+       routeGenerator func(data interface{}) int) {
        e.receivers = append(e.receivers, &mapReceiver{
                emap:         emap,
                perCPUBuffer: perCPUBufferSize,
@@ -151,14 +150,9 @@ func (e *EventQueue) Start(ctx context.Context, linker 
*Linker) {
        })
 }
 
-func (e *EventQueue) Push(key string, data interface{}) {
-       // calculate hash of key
-       h := fnv.New32a()
-       h.Write([]byte(key))
-       sum32 := int(h.Sum32())
-
+func (e *EventQueue) Push(key int, data interface{}) {
        // append data
-       e.partitions[sum32%e.count].channel <- data
+       e.partitions[key%e.count].channel <- data
 }
 
 func (e *EventQueue) PartitionContexts() []PartitionContext {
@@ -174,7 +168,7 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
                func(receiver *mapReceiver) {
                        linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(data interface{}) {
                                e.routerTransformer(data, receiver.router)
-                       }, receiver.perCPUBuffer, 1, receiver.dataSupplier)
+                       }, receiver.perCPUBuffer, r.parallels, 
receiver.dataSupplier)
                }(r)
        }
 
@@ -217,7 +211,7 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
        }()
 }
 
-func (e *EventQueue) routerTransformer(data interface{}, routeGenerator 
func(data interface{}) string) {
+func (e *EventQueue) routerTransformer(data interface{}, routeGenerator 
func(data interface{}) int) {
        key := routeGenerator(data)
        e.Push(key, data)
 }

Reply via email to