This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch perf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/perf by this push:
new f2318bf reducing partition calc
f2318bf is described below
commit f2318bf10e4317390fc0b9881fe770337dcc395b
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 16:02:05 2024 +0800
reducing partition calc
---
pkg/accesslog/collector/connection.go | 8 ++--
pkg/accesslog/collector/protocols/queue.go | 8 ++--
.../task/network/analyze/layer7/events.go | 8 ++--
pkg/tools/btf/linker.go | 49 +++++++---------------
pkg/tools/btf/queue.go | 18 +++-----
5 files changed, 34 insertions(+), 57 deletions(-)
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index d861447..4fc6823 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -84,13 +84,13 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
return &events.SocketConnectEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
+ }, func(data interface{}) int {
+ return int(data.(*events.SocketConnectEvent).ConID)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func()
interface{} {
return &events.SocketCloseEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketCloseEvent).ConnectionID)
+ }, func(data interface{}) int {
+ return int(data.(*events.SocketCloseEvent).ConnectionID)
})
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index c9c2fdf..a4577f7 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -97,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return q.detailSupplier()
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(events.SocketDetail).GetConnectionID())
+ }, func(data interface{}) int {
+ return int(data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return &events.SocketDataUploadEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketDataUploadEvent).ConnectionID)
+ }, func(data interface{}) int {
+ return
int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
q.eventQueue.Start(ctx, q.context.BPF.Linker)
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 511a088..1c01084 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -38,15 +38,15 @@ func (l *Listener) startSocketData(ctx context.Context,
bpfLoader *bpf.Loader) {
// socket buffer data
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDataUploadEvent{}
- }, func(data interface{}) string {
- return
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
+ }, func(data interface{}) int {
+ return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
})
// socket detail
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDetailEvent{}
- }, func(data interface{}) string {
- return
data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
+ }, func(data interface{}) int {
+ return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
})
l.socketDataQueue.Start(ctx, bpfLoader.Linker)
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 524a193..bdbd7a6 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -198,41 +198,24 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap
*ebpf.Map, dataSupplier fun
continue
}
- go m.handleReadEvent(sample, dataSupplier, bufReader)
- //data := dataSupplier()
- //if r, ok := data.(reader.EventReader); ok {
- // sampleReader := reader.NewReader(sample)
- // r.ReadFrom(sampleReader)
- // if readErr := sampleReader.HasError(); readErr
!= nil {
- // log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
- // continue
- // }
- //} else {
- // if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
- // log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
- // continue
- // }
- //}
- //
- //bufReader(data)
- }
- }()
-}
+ data := dataSupplier()
+ if r, ok := data.(reader.EventReader); ok {
+ sampleReader := reader.NewReader(sample)
+ r.ReadFrom(sampleReader)
+ if readErr := sampleReader.HasError(); readErr
!= nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ continue
+ }
+ } else {
+ if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ continue
+ }
+ }
-func (m *Linker) handleReadEvent(sample []byte, dataSupplier func()
interface{}, bufReader RingBufferReader) {
- data := dataSupplier()
- if r, ok := data.(reader.EventReader); ok {
- sampleReader := reader.NewReader(sample)
- r.ReadFrom(sampleReader)
- if readErr := sampleReader.HasError(); readErr != nil {
- log.Warnf("parsing data from ringbuffer error: %v",
readErr)
- }
- } else {
- if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
- log.Warnf("parsing data from ringbuffer error: %v", err)
+ bufReader(data)
}
- }
- bufReader(data)
+ }()
}
func (m *Linker) OpenUProbeExeFile(path string) *UProbeExeFile {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 9dd8ed1..fdd8754 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,7 +20,6 @@ package btf
import (
"context"
"fmt"
- "hash/fnv"
"sync"
"time"
@@ -122,7 +121,7 @@ type mapReceiver struct {
emap *ebpf.Map
perCPUBuffer int
dataSupplier func() interface{}
- router func(data interface{}) string
+ router func(data interface{}) int
parallels int
}
@@ -135,7 +134,7 @@ func NewEventQueue(name string, partitionCount,
sizePerPartition int, contextGen
}
func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
- routeGenerator func(data interface{}) string) {
+ routeGenerator func(data interface{}) int) {
e.receivers = append(e.receivers, &mapReceiver{
emap: emap,
perCPUBuffer: perCPUBufferSize,
@@ -151,14 +150,9 @@ func (e *EventQueue) Start(ctx context.Context, linker
*Linker) {
})
}
-func (e *EventQueue) Push(key string, data interface{}) {
- // calculate hash of key
- h := fnv.New32a()
- h.Write([]byte(key))
- sum32 := int(h.Sum32())
-
+func (e *EventQueue) Push(key int, data interface{}) {
// append data
- e.partitions[sum32%e.count].channel <- data
+ e.partitions[key%e.count].channel <- data
}
func (e *EventQueue) PartitionContexts() []PartitionContext {
@@ -174,7 +168,7 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
func(receiver *mapReceiver) {
linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(data interface{}) {
e.routerTransformer(data, receiver.router)
- }, receiver.perCPUBuffer, 1, receiver.dataSupplier)
+ }, receiver.perCPUBuffer, r.parallels,
receiver.dataSupplier)
}(r)
}
@@ -217,7 +211,7 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
}()
}
-func (e *EventQueue) routerTransformer(data interface{}, routeGenerator
func(data interface{}) string) {
+func (e *EventQueue) routerTransformer(data interface{}, routeGenerator
func(data interface{}) int) {
key := routeGenerator(data)
e.Push(key, data)
}