This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 0542172  trying to using unsafe.reader
0542172 is described below

commit 05421729ad12ce7ab422e9e0c08e72d9e8f04b07
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 19:27:53 2024 +0800

    trying to using unsafe.reader
---
 pkg/accesslog/collector/connection.go              | 17 ++++++-----
 pkg/accesslog/collector/process.go                 |  7 ++---
 pkg/accesslog/collector/protocols/queue.go         | 20 +++++++------
 pkg/accesslog/collector/ztunnel.go                 |  8 +++---
 .../continuous/checker/bpf/network/network.go      |  2 +-
 .../continuous/checker/bpf/network/reader.go       | 10 +++----
 .../task/network/analyze/layer7/events.go          | 24 +++++++++-------
 pkg/tools/btf/linker.go                            | 33 +++++++---------------
 pkg/tools/btf/queue.go                             | 12 ++++----
 9 files changed, 64 insertions(+), 69 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index 7954e2d..e7bcdf7 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -22,10 +22,12 @@ import (
        "context"
        "encoding/binary"
        "fmt"
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "net"
        "os"
        "sync"
        "time"
+       "unsafe"
 
        "github.com/docker/go-units"
 
@@ -85,16 +87,17 @@ func (c *ConnectCollector) Start(m *module.Manager, ctx 
*common.AccessLogContext
                        return NewConnectionPartitionContext(ctx, 
m.FindModule(process.ModuleName).(process.K8sOperator))
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
-               ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
-                       return &events.SocketConnectEvent{}
+               ctx.Config.ConnectionAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
+                       return 
(*events.SocketConnectEvent)(unsafe.Pointer(&r.Sample[0]))
                }, func(data interface{}) int {
                        return int(data.(*events.SocketConnectEvent).ConID)
                })
-       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() 
interface{} {
-               return &events.SocketCloseEvent{}
-       }, func(data interface{}) int {
-               return int(data.(*events.SocketCloseEvent).ConnectionID)
-       })
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize),
+               ctx.Config.ConnectionAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
+                       return 
(*events.SocketCloseEvent)(unsafe.Pointer(&r.Sample[0]))
+               }, func(data interface{}) int {
+                       return int(data.(*events.SocketCloseEvent).ConnectionID)
+               })
        c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
 
        ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", 
ctx.BPF.TracepointEnterConnect)
diff --git a/pkg/accesslog/collector/process.go 
b/pkg/accesslog/collector/process.go
index e4dbdce..2e5e3de 100644
--- a/pkg/accesslog/collector/process.go
+++ b/pkg/accesslog/collector/process.go
@@ -20,6 +20,7 @@ package collector
 import (
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/module"
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
 )
 
 var processCollectInstance = NewProcessCollector()
@@ -36,10 +37,8 @@ func (p *ProcessCollector) Start(_ *module.Manager, context 
*common.AccessLogCon
        // monitor process been execute
        context.BPF.AddTracePoint("sched", "sched_process_fork", 
context.BPF.TracepointSchedProcessFork)
 
-       context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(data 
interface{}) {
-               
context.ConnectionMgr.OnNewProcessExecuting(int32(data.(*ProcessExecuteEvent).PID))
-       }, func() interface{} {
-               return &ProcessExecuteEvent{}
+       context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(r 
*reader.Reader) {
+               
context.ConnectionMgr.OnNewProcessExecuting(int32(r.ReadUint32()))
        })
 
        return nil
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index c3ccd4c..929d0ef 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -21,11 +21,13 @@ import (
        "context"
        "errors"
        "fmt"
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "os"
        "sort"
        "strconv"
        "sync"
        "time"
+       "unsafe"
 
        "github.com/cilium/ebpf"
 
@@ -51,7 +53,7 @@ type AnalyzeQueue struct {
        eventQueue   *btf.EventQueue
        perCPUBuffer int64
 
-       detailSupplier   func() events.SocketDetail
+       detailReader     func(*reader.Reader) events.SocketDetail
        supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
 }
 
@@ -76,8 +78,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
        return &AnalyzeQueue{
                context:      ctx,
                perCPUBuffer: perCPUBufferSize,
-               detailSupplier: func() events.SocketDetail {
-                       return &events.SocketDetailEvent{}
+               detailReader: func(r *reader.Reader) events.SocketDetail {
+                       return 
(*events.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
                },
                supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol 
{
                        return []Protocol{
@@ -95,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, 
int(q.perCPUBuffer),
-               q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
-                       return q.detailSupplier()
+               q.context.Config.ProtocolAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
+                       return q.detailReader(r)
                }, func(data interface{}) int {
                        return int(data.(events.SocketDetail).GetConnectionID())
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
-               q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
-                       return &events.SocketDataUploadEvent{}
+               q.context.Config.ProtocolAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
+                       return 
(*events.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
                }, func(data interface{}) int {
                        return 
int(data.(*events.SocketDataUploadEvent).ConnectionID)
                })
@@ -110,8 +112,8 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
        q.eventQueue.Start(ctx, q.context.BPF.Linker)
 }
 
-func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func() 
events.SocketDetail) {
-       q.detailSupplier = supplier
+func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func(r *reader.Reader) 
events.SocketDetail) {
+       q.detailReader = supplier
 }
 
 func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx 
*common.AccessLogContext) []Protocol) {
diff --git a/pkg/accesslog/collector/ztunnel.go 
b/pkg/accesslog/collector/ztunnel.go
index f6a02ae..99a6637 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -20,8 +20,10 @@ package collector
 import (
        "context"
        "fmt"
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "strings"
        "time"
+       "unsafe"
 
        "k8s.io/apimachinery/pkg/util/cache"
 
@@ -74,8 +76,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
                return err
        }
 
-       ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, 
func(data interface{}) {
-               event := data.(*events.ZTunnelSocketMappingEvent)
+       ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(r 
*reader.Reader) {
+               event := 
(*events.ZTunnelSocketMappingEvent)(unsafe.Pointer(&r.Sample[0]))
                localIP := z.convertBPFIPToString(event.OriginalSrcIP)
                localPort := event.OriginalSrcPort
                remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
@@ -88,8 +90,6 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
                        IP:   lbIP,
                        Port: event.LoadBalancedDestPort,
                }, z.ipMappingExpireDuration)
-       }, func() interface{} {
-               return &events.ZTunnelSocketMappingEvent{}
        })
        go func() {
                ticker := time.NewTicker(ZTunnelProcessFinderInterval)
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go 
b/pkg/profiling/continuous/checker/bpf/network/network.go
index a3cb644..a062019 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -148,7 +148,7 @@ func startBPFIfNeed() error {
                        n.ReceiveBufferEvent(event)
                }
        })
-       bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, 
reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
+       bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, 
reader.Read, os.Getpagesize()*100, 1)
 
        if err := bpfLinker.HasError(); err != nil {
                _ = bpfLinker.Close()
diff --git a/pkg/profiling/continuous/checker/bpf/network/reader.go 
b/pkg/profiling/continuous/checker/bpf/network/reader.go
index 721e17f..93ebacf 100644
--- a/pkg/profiling/continuous/checker/bpf/network/reader.go
+++ b/pkg/profiling/continuous/checker/bpf/network/reader.go
@@ -18,7 +18,9 @@
 package network
 
 import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
+       "unsafe"
 )
 
 type BufferDirection uint8
@@ -55,8 +57,8 @@ func newNetworkBufferReader(notify func(event BufferEvent)) 
*networkBufferReader
        }
 }
 
-func (n *networkBufferReader) Read(data interface{}) {
-       buffer := data.(*networkBufferInBPF)
+func (n *networkBufferReader) Read(data *reader.Reader) {
+       buffer := (*networkBufferInBPF)(unsafe.Pointer(&data.Sample[0]))
        analyzer := protocolAnalyzers[buffer.Protocol]
        if analyzer == nil {
                return
@@ -68,10 +70,6 @@ func (n *networkBufferReader) Read(data interface{}) {
        }
 }
 
-func (n *networkBufferReader) BufferDataBPFSupplier() interface{} {
-       return &networkBufferInBPF{}
-}
-
 type ProtocolAnalyzer interface {
        HandleBufferEvent(buffer *networkBufferInBPF) BufferEvent
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1c01084..6bd7dab 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,6 +19,8 @@ package layer7
 
 import (
        "context"
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+       "unsafe"
 
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        analyzeBase 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
@@ -36,18 +38,20 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize 
int, config *profili
 
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) 
{
        // socket buffer data
-       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
-               return &analyzeBase.SocketDataUploadEvent{}
-       }, func(data interface{}) int {
-               return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
-       })
+       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer,
+               1, func(r *reader.Reader) interface{} {
+                       return 
(*analyzeBase.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
+               }, func(data interface{}) int {
+                       return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
+               })
 
        // socket detail
-       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
-               return &analyzeBase.SocketDetailEvent{}
-       }, func(data interface{}) int {
-               return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
-       })
+       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer,
+               1, func(r *reader.Reader) interface{} {
+                       return 
(*analyzeBase.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
+               }, func(data interface{}) int {
+                       return 
int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
+               })
 
        l.socketDataQueue.Start(ctx, bpfLoader.Linker)
 }
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..852154e 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -18,8 +18,6 @@
 package btf
 
 import (
-       "bytes"
-       "encoding/binary"
        "errors"
        "fmt"
        "io"
@@ -44,7 +42,7 @@ import (
 const defaultSymbolPrefix = "sys_"
 
 type LinkFunc func(symbol string, prog *ebpf.Program, opts 
*link.KprobeOptions) (link.Link, error)
-type RingBufferReader func(data interface{})
+type RingBufferReader func(data *reader.Reader)
 
 var syscallPrefix string
 
@@ -161,12 +159,12 @@ func (m *Linker) AddTracePoint(sys, name string, p 
*ebpf.Program) {
        }
 }
 
-func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, 
dataSupplier func() interface{}) {
-       m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1, 
dataSupplier)
+func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader) {
+       m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1)
 }
 
 func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
-       parallels int, dataSupplier func() interface{}) {
+       parallels int) {
        rd, err := newQueueReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
@@ -179,11 +177,11 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap 
*ebpf.Map, bufReader RingBuff
        m.closers = append(m.closers, rd)
 
        for i := 0; i < parallels; i++ {
-               m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+               m.asyncReadEvent(rd, emap, bufReader)
        }
 }
 
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, bufReader 
RingBufferReader) {
        go func() {
                for {
                        sample, err := rd.Read()
@@ -198,22 +196,11 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap 
*ebpf.Map, dataSupplier fun
                                continue
                        }
 
-                       data := dataSupplier()
-                       if r, ok := data.(reader.EventReader); ok {
-                               sampleReader := reader.NewReader(sample)
-                               r.ReadFrom(sampleReader)
-                               if readErr := sampleReader.HasError(); readErr 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
-                                       continue
-                               }
-                       } else {
-                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
-                                       continue
-                               }
+                       reader := reader.NewReader(sample)
+                       bufReader(reader)
+                       if reader.HasError() != nil {
+                               log.Warnf("read from %s ringbuffer error: %v", 
emap.String(), reader.HasError())
                        }
-
-                       bufReader(data)
                }
        }()
 }
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index fdd8754..8f9aa27 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,6 +20,7 @@ package btf
 import (
        "context"
        "fmt"
+       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "sync"
        "time"
 
@@ -120,7 +121,7 @@ type EventQueue struct {
 type mapReceiver struct {
        emap         *ebpf.Map
        perCPUBuffer int
-       dataSupplier func() interface{}
+       dataReader   func(*reader.Reader) interface{}
        router       func(data interface{}) int
        parallels    int
 }
@@ -133,12 +134,12 @@ func NewEventQueue(name string, partitionCount, 
sizePerPartition int, contextGen
        return &EventQueue{name: name, count: partitionCount, partitions: 
partitions}
 }
 
-func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataReader func(r *reader.Reader) interface{},
        routeGenerator func(data interface{}) int) {
        e.receivers = append(e.receivers, &mapReceiver{
                emap:         emap,
                perCPUBuffer: perCPUBufferSize,
-               dataSupplier: dataSupplier,
+               dataReader:   dataReader,
                router:       routeGenerator,
                parallels:    parallels,
        })
@@ -166,9 +167,10 @@ func (e *EventQueue) PartitionContexts() 
[]PartitionContext {
 func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
        for _, r := range e.receivers {
                func(receiver *mapReceiver) {
-                       linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(data interface{}) {
+                       linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(r *reader.Reader) {
+                               data := receiver.dataReader(r)
                                e.routerTransformer(data, receiver.router)
-                       }, receiver.perCPUBuffer, r.parallels, 
receiver.dataSupplier)
+                       }, receiver.perCPUBuffer, r.parallels)
                }(r)
        }
 

Reply via email to