This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 350767c  rollback
350767c is described below

commit 350767cbc804ad03c5ed9a97dfaab8c651bd3f89
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 20:39:04 2024 +0800

    rollback
---
 pkg/accesslog/collector/connection.go              | 17 +++++------
 pkg/accesslog/collector/process.go                 |  7 +++--
 pkg/accesslog/collector/protocols/queue.go         | 20 ++++++-------
 pkg/accesslog/collector/ztunnel.go                 |  8 +++---
 .../continuous/checker/bpf/network/network.go      |  2 +-
 .../continuous/checker/bpf/network/reader.go       | 10 ++++---
 .../task/network/analyze/layer7/events.go          | 24 +++++++---------
 pkg/tools/btf/linker.go                            | 33 +++++++++++++++-------
 pkg/tools/btf/queue.go                             | 12 ++++----
 9 files changed, 69 insertions(+), 64 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index e7bcdf7..7954e2d 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -22,12 +22,10 @@ import (
        "context"
        "encoding/binary"
        "fmt"
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "net"
        "os"
        "sync"
        "time"
-       "unsafe"
 
        "github.com/docker/go-units"
 
@@ -87,17 +85,16 @@ func (c *ConnectCollector) Start(m *module.Manager, ctx 
*common.AccessLogContext
                        return NewConnectionPartitionContext(ctx, 
m.FindModule(process.ModuleName).(process.K8sOperator))
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
-               ctx.Config.ConnectionAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
-                       return 
(*events.SocketConnectEvent)(unsafe.Pointer(&r.Sample[0]))
+               ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
+                       return &events.SocketConnectEvent{}
                }, func(data interface{}) int {
                        return int(data.(*events.SocketConnectEvent).ConID)
                })
-       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize),
-               ctx.Config.ConnectionAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
-                       return 
(*events.SocketCloseEvent)(unsafe.Pointer(&r.Sample[0]))
-               }, func(data interface{}) int {
-                       return int(data.(*events.SocketCloseEvent).ConnectionID)
-               })
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() 
interface{} {
+               return &events.SocketCloseEvent{}
+       }, func(data interface{}) int {
+               return int(data.(*events.SocketCloseEvent).ConnectionID)
+       })
        c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
 
        ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", 
ctx.BPF.TracepointEnterConnect)
diff --git a/pkg/accesslog/collector/process.go 
b/pkg/accesslog/collector/process.go
index 2e5e3de..e4dbdce 100644
--- a/pkg/accesslog/collector/process.go
+++ b/pkg/accesslog/collector/process.go
@@ -20,7 +20,6 @@ package collector
 import (
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/module"
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
 )
 
 var processCollectInstance = NewProcessCollector()
@@ -37,8 +36,10 @@ func (p *ProcessCollector) Start(_ *module.Manager, context 
*common.AccessLogCon
        // monitor process been execute
        context.BPF.AddTracePoint("sched", "sched_process_fork", 
context.BPF.TracepointSchedProcessFork)
 
-       context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(r 
*reader.Reader) {
-               
context.ConnectionMgr.OnNewProcessExecuting(int32(r.ReadUint32()))
+       context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(data 
interface{}) {
+               
context.ConnectionMgr.OnNewProcessExecuting(int32(data.(*ProcessExecuteEvent).PID))
+       }, func() interface{} {
+               return &ProcessExecuteEvent{}
        })
 
        return nil
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 929d0ef..c3ccd4c 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -21,13 +21,11 @@ import (
        "context"
        "errors"
        "fmt"
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "os"
        "sort"
        "strconv"
        "sync"
        "time"
-       "unsafe"
 
        "github.com/cilium/ebpf"
 
@@ -53,7 +51,7 @@ type AnalyzeQueue struct {
        eventQueue   *btf.EventQueue
        perCPUBuffer int64
 
-       detailReader     func(*reader.Reader) events.SocketDetail
+       detailSupplier   func() events.SocketDetail
        supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
 }
 
@@ -78,8 +76,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
        return &AnalyzeQueue{
                context:      ctx,
                perCPUBuffer: perCPUBufferSize,
-               detailReader: func(r *reader.Reader) events.SocketDetail {
-                       return 
(*events.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
+               detailSupplier: func() events.SocketDetail {
+                       return &events.SocketDetailEvent{}
                },
                supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol 
{
                        return []Protocol{
@@ -97,14 +95,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, 
int(q.perCPUBuffer),
-               q.context.Config.ProtocolAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
-                       return q.detailReader(r)
+               q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
+                       return q.detailSupplier()
                }, func(data interface{}) int {
                        return int(data.(events.SocketDetail).GetConnectionID())
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
-               q.context.Config.ProtocolAnalyze.ParseParallels, func(r 
*reader.Reader) interface{} {
-                       return 
(*events.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
+               q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
+                       return &events.SocketDataUploadEvent{}
                }, func(data interface{}) int {
                        return 
int(data.(*events.SocketDataUploadEvent).ConnectionID)
                })
@@ -112,8 +110,8 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
        q.eventQueue.Start(ctx, q.context.BPF.Linker)
 }
 
-func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func(r *reader.Reader) 
events.SocketDetail) {
-       q.detailReader = supplier
+func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func() 
events.SocketDetail) {
+       q.detailSupplier = supplier
 }
 
 func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx 
*common.AccessLogContext) []Protocol) {
diff --git a/pkg/accesslog/collector/ztunnel.go 
b/pkg/accesslog/collector/ztunnel.go
index 99a6637..f6a02ae 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -20,10 +20,8 @@ package collector
 import (
        "context"
        "fmt"
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "strings"
        "time"
-       "unsafe"
 
        "k8s.io/apimachinery/pkg/util/cache"
 
@@ -76,8 +74,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
                return err
        }
 
-       ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(r 
*reader.Reader) {
-               event := 
(*events.ZTunnelSocketMappingEvent)(unsafe.Pointer(&r.Sample[0]))
+       ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, 
func(data interface{}) {
+               event := data.(*events.ZTunnelSocketMappingEvent)
                localIP := z.convertBPFIPToString(event.OriginalSrcIP)
                localPort := event.OriginalSrcPort
                remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
@@ -90,6 +88,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
                        IP:   lbIP,
                        Port: event.LoadBalancedDestPort,
                }, z.ipMappingExpireDuration)
+       }, func() interface{} {
+               return &events.ZTunnelSocketMappingEvent{}
        })
        go func() {
                ticker := time.NewTicker(ZTunnelProcessFinderInterval)
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go 
b/pkg/profiling/continuous/checker/bpf/network/network.go
index a062019..a3cb644 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -148,7 +148,7 @@ func startBPFIfNeed() error {
                        n.ReceiveBufferEvent(event)
                }
        })
-       bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, 
reader.Read, os.Getpagesize()*100, 1)
+       bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, 
reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
 
        if err := bpfLinker.HasError(); err != nil {
                _ = bpfLinker.Close()
diff --git a/pkg/profiling/continuous/checker/bpf/network/reader.go 
b/pkg/profiling/continuous/checker/bpf/network/reader.go
index 93ebacf..721e17f 100644
--- a/pkg/profiling/continuous/checker/bpf/network/reader.go
+++ b/pkg/profiling/continuous/checker/bpf/network/reader.go
@@ -18,9 +18,7 @@
 package network
 
 import (
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
-       "unsafe"
 )
 
 type BufferDirection uint8
@@ -57,8 +55,8 @@ func newNetworkBufferReader(notify func(event BufferEvent)) 
*networkBufferReader
        }
 }
 
-func (n *networkBufferReader) Read(data *reader.Reader) {
-       buffer := (*networkBufferInBPF)(unsafe.Pointer(&data.Sample[0]))
+func (n *networkBufferReader) Read(data interface{}) {
+       buffer := data.(*networkBufferInBPF)
        analyzer := protocolAnalyzers[buffer.Protocol]
        if analyzer == nil {
                return
@@ -70,6 +68,10 @@ func (n *networkBufferReader) Read(data *reader.Reader) {
        }
 }
 
+func (n *networkBufferReader) BufferDataBPFSupplier() interface{} {
+       return &networkBufferInBPF{}
+}
+
 type ProtocolAnalyzer interface {
        HandleBufferEvent(buffer *networkBufferInBPF) BufferEvent
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 6bd7dab..1c01084 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,8 +19,6 @@ package layer7
 
 import (
        "context"
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
-       "unsafe"
 
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        analyzeBase 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
@@ -38,20 +36,18 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize 
int, config *profili
 
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) 
{
        // socket buffer data
-       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer,
-               1, func(r *reader.Reader) interface{} {
-                       return 
(*analyzeBase.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
-               }, func(data interface{}) int {
-                       return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
-               })
+       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
+               return &analyzeBase.SocketDataUploadEvent{}
+       }, func(data interface{}) int {
+               return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
+       })
 
        // socket detail
-       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer,
-               1, func(r *reader.Reader) interface{} {
-                       return 
(*analyzeBase.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
-               }, func(data interface{}) int {
-                       return 
int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
-               })
+       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
+               return &analyzeBase.SocketDetailEvent{}
+       }, func(data interface{}) int {
+               return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
+       })
 
        l.socketDataQueue.Start(ctx, bpfLoader.Linker)
 }
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 852154e..bdbd7a6 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -18,6 +18,8 @@
 package btf
 
 import (
+       "bytes"
+       "encoding/binary"
        "errors"
        "fmt"
        "io"
@@ -42,7 +44,7 @@ import (
 const defaultSymbolPrefix = "sys_"
 
 type LinkFunc func(symbol string, prog *ebpf.Program, opts 
*link.KprobeOptions) (link.Link, error)
-type RingBufferReader func(data *reader.Reader)
+type RingBufferReader func(data interface{})
 
 var syscallPrefix string
 
@@ -159,12 +161,12 @@ func (m *Linker) AddTracePoint(sys, name string, p 
*ebpf.Program) {
        }
 }
 
-func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader) {
-       m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1)
+func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, 
dataSupplier func() interface{}) {
+       m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1, 
dataSupplier)
 }
 
 func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
-       parallels int) {
+       parallels int, dataSupplier func() interface{}) {
        rd, err := newQueueReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
@@ -177,11 +179,11 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap 
*ebpf.Map, bufReader RingBuff
        m.closers = append(m.closers, rd)
 
        for i := 0; i < parallels; i++ {
-               m.asyncReadEvent(rd, emap, bufReader)
+               m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
        }
 }
 
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, bufReader 
RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
        go func() {
                for {
                        sample, err := rd.Read()
@@ -196,11 +198,22 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap 
*ebpf.Map, bufReader RingBu
                                continue
                        }
 
-                       reader := reader.NewReader(sample)
-                       bufReader(reader)
-                       if reader.HasError() != nil {
-                               log.Warnf("read from %s ringbuffer error: %v", 
emap.String(), reader.HasError())
+                       data := dataSupplier()
+                       if r, ok := data.(reader.EventReader); ok {
+                               sampleReader := reader.NewReader(sample)
+                               r.ReadFrom(sampleReader)
+                               if readErr := sampleReader.HasError(); readErr 
!= nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                                       continue
+                               }
+                       } else {
+                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                                       continue
+                               }
                        }
+
+                       bufReader(data)
                }
        }()
 }
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 8f9aa27..fdd8754 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,7 +20,6 @@ package btf
 import (
        "context"
        "fmt"
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "sync"
        "time"
 
@@ -121,7 +120,7 @@ type EventQueue struct {
 type mapReceiver struct {
        emap         *ebpf.Map
        perCPUBuffer int
-       dataReader   func(*reader.Reader) interface{}
+       dataSupplier func() interface{}
        router       func(data interface{}) int
        parallels    int
 }
@@ -134,12 +133,12 @@ func NewEventQueue(name string, partitionCount, 
sizePerPartition int, contextGen
        return &EventQueue{name: name, count: partitionCount, partitions: 
partitions}
 }
 
-func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataReader func(r *reader.Reader) interface{},
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
        routeGenerator func(data interface{}) int) {
        e.receivers = append(e.receivers, &mapReceiver{
                emap:         emap,
                perCPUBuffer: perCPUBufferSize,
-               dataReader:   dataReader,
+               dataSupplier: dataSupplier,
                router:       routeGenerator,
                parallels:    parallels,
        })
@@ -167,10 +166,9 @@ func (e *EventQueue) PartitionContexts() 
[]PartitionContext {
 func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
        for _, r := range e.receivers {
                func(receiver *mapReceiver) {
-                       linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(r *reader.Reader) {
-                               data := receiver.dataReader(r)
+                       linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(data interface{}) {
                                e.routerTransformer(data, receiver.router)
-                       }, receiver.perCPUBuffer, r.parallels)
+                       }, receiver.perCPUBuffer, r.parallels, 
receiver.dataSupplier)
                }(r)
        }
 

Reply via email to