This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 0542172 trying to using unsafe.reader
0542172 is described below
commit 05421729ad12ce7ab422e9e0c08e72d9e8f04b07
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 19:27:53 2024 +0800
trying to using unsafe.reader
---
pkg/accesslog/collector/connection.go | 17 ++++++-----
pkg/accesslog/collector/process.go | 7 ++---
pkg/accesslog/collector/protocols/queue.go | 20 +++++++------
pkg/accesslog/collector/ztunnel.go | 8 +++---
.../continuous/checker/bpf/network/network.go | 2 +-
.../continuous/checker/bpf/network/reader.go | 10 +++----
.../task/network/analyze/layer7/events.go | 24 +++++++++-------
pkg/tools/btf/linker.go | 33 +++++++---------------
pkg/tools/btf/queue.go | 12 ++++----
9 files changed, 64 insertions(+), 69 deletions(-)
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index 7954e2d..e7bcdf7 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -22,10 +22,12 @@ import (
"context"
"encoding/binary"
"fmt"
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"net"
"os"
"sync"
"time"
+ "unsafe"
"github.com/docker/go-units"
@@ -85,16 +87,17 @@ func (c *ConnectCollector) Start(m *module.Manager, ctx
*common.AccessLogContext
return NewConnectionPartitionContext(ctx,
m.FindModule(process.ModuleName).(process.K8sOperator))
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
- ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
- return &events.SocketConnectEvent{}
+ ctx.Config.ConnectionAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
+ return
(*events.SocketConnectEvent)(unsafe.Pointer(&r.Sample[0]))
}, func(data interface{}) int {
return int(data.(*events.SocketConnectEvent).ConID)
})
- c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func()
interface{} {
- return &events.SocketCloseEvent{}
- }, func(data interface{}) int {
- return int(data.(*events.SocketCloseEvent).ConnectionID)
- })
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize),
+ ctx.Config.ConnectionAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
+ return
(*events.SocketCloseEvent)(unsafe.Pointer(&r.Sample[0]))
+ }, func(data interface{}) int {
+ return int(data.(*events.SocketCloseEvent).ConnectionID)
+ })
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect",
ctx.BPF.TracepointEnterConnect)
diff --git a/pkg/accesslog/collector/process.go
b/pkg/accesslog/collector/process.go
index e4dbdce..2e5e3de 100644
--- a/pkg/accesslog/collector/process.go
+++ b/pkg/accesslog/collector/process.go
@@ -20,6 +20,7 @@ package collector
import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
)
var processCollectInstance = NewProcessCollector()
@@ -36,10 +37,8 @@ func (p *ProcessCollector) Start(_ *module.Manager, context
*common.AccessLogCon
// monitor process been execute
context.BPF.AddTracePoint("sched", "sched_process_fork",
context.BPF.TracepointSchedProcessFork)
- context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(data
interface{}) {
-
context.ConnectionMgr.OnNewProcessExecuting(int32(data.(*ProcessExecuteEvent).PID))
- }, func() interface{} {
- return &ProcessExecuteEvent{}
+ context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(r
*reader.Reader) {
+
context.ConnectionMgr.OnNewProcessExecuting(int32(r.ReadUint32()))
})
return nil
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index c3ccd4c..929d0ef 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -21,11 +21,13 @@ import (
"context"
"errors"
"fmt"
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"os"
"sort"
"strconv"
"sync"
"time"
+ "unsafe"
"github.com/cilium/ebpf"
@@ -51,7 +53,7 @@ type AnalyzeQueue struct {
eventQueue *btf.EventQueue
perCPUBuffer int64
- detailSupplier func() events.SocketDetail
+ detailReader func(*reader.Reader) events.SocketDetail
supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
}
@@ -76,8 +78,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
return &AnalyzeQueue{
context: ctx,
perCPUBuffer: perCPUBufferSize,
- detailSupplier: func() events.SocketDetail {
- return &events.SocketDetailEvent{}
+ detailReader: func(r *reader.Reader) events.SocketDetail {
+ return
(*events.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
},
supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol
{
return []Protocol{
@@ -95,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue,
int(q.perCPUBuffer),
- q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
- return q.detailSupplier()
+ q.context.Config.ProtocolAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
+ return q.detailReader(r)
}, func(data interface{}) int {
return int(data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
- q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
- return &events.SocketDataUploadEvent{}
+ q.context.Config.ProtocolAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
+ return
(*events.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
}, func(data interface{}) int {
return
int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
@@ -110,8 +112,8 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.Start(ctx, q.context.BPF.Linker)
}
-func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func()
events.SocketDetail) {
- q.detailSupplier = supplier
+func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func(r *reader.Reader)
events.SocketDetail) {
+ q.detailReader = supplier
}
func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx
*common.AccessLogContext) []Protocol) {
diff --git a/pkg/accesslog/collector/ztunnel.go
b/pkg/accesslog/collector/ztunnel.go
index f6a02ae..99a6637 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -20,8 +20,10 @@ package collector
import (
"context"
"fmt"
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"strings"
"time"
+ "unsafe"
"k8s.io/apimachinery/pkg/util/cache"
@@ -74,8 +76,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx
*common.AccessLogConte
return err
}
- ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue,
func(data interface{}) {
- event := data.(*events.ZTunnelSocketMappingEvent)
+ ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(r
*reader.Reader) {
+ event :=
(*events.ZTunnelSocketMappingEvent)(unsafe.Pointer(&r.Sample[0]))
localIP := z.convertBPFIPToString(event.OriginalSrcIP)
localPort := event.OriginalSrcPort
remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
@@ -88,8 +90,6 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx
*common.AccessLogConte
IP: lbIP,
Port: event.LoadBalancedDestPort,
}, z.ipMappingExpireDuration)
- }, func() interface{} {
- return &events.ZTunnelSocketMappingEvent{}
})
go func() {
ticker := time.NewTicker(ZTunnelProcessFinderInterval)
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go
b/pkg/profiling/continuous/checker/bpf/network/network.go
index a3cb644..a062019 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -148,7 +148,7 @@ func startBPFIfNeed() error {
n.ReceiveBufferEvent(event)
}
})
- bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue,
reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
+ bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue,
reader.Read, os.Getpagesize()*100, 1)
if err := bpfLinker.HasError(); err != nil {
_ = bpfLinker.Close()
diff --git a/pkg/profiling/continuous/checker/bpf/network/reader.go
b/pkg/profiling/continuous/checker/bpf/network/reader.go
index 721e17f..93ebacf 100644
--- a/pkg/profiling/continuous/checker/bpf/network/reader.go
+++ b/pkg/profiling/continuous/checker/bpf/network/reader.go
@@ -18,7 +18,9 @@
package network
import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/enums"
+ "unsafe"
)
type BufferDirection uint8
@@ -55,8 +57,8 @@ func newNetworkBufferReader(notify func(event BufferEvent))
*networkBufferReader
}
}
-func (n *networkBufferReader) Read(data interface{}) {
- buffer := data.(*networkBufferInBPF)
+func (n *networkBufferReader) Read(data *reader.Reader) {
+ buffer := (*networkBufferInBPF)(unsafe.Pointer(&data.Sample[0]))
analyzer := protocolAnalyzers[buffer.Protocol]
if analyzer == nil {
return
@@ -68,10 +70,6 @@ func (n *networkBufferReader) Read(data interface{}) {
}
}
-func (n *networkBufferReader) BufferDataBPFSupplier() interface{} {
- return &networkBufferInBPF{}
-}
-
type ProtocolAnalyzer interface {
HandleBufferEvent(buffer *networkBufferInBPF) BufferEvent
}
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1c01084..6bd7dab 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,6 +19,8 @@ package layer7
import (
"context"
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+ "unsafe"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
analyzeBase
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
@@ -36,18 +38,20 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
// socket buffer data
- l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
- return &analyzeBase.SocketDataUploadEvent{}
- }, func(data interface{}) int {
- return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
- })
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer,
+ 1, func(r *reader.Reader) interface{} {
+ return
(*analyzeBase.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
+ }, func(data interface{}) int {
+ return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
+ })
// socket detail
- l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
- return &analyzeBase.SocketDetailEvent{}
- }, func(data interface{}) int {
- return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
- })
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer,
+ 1, func(r *reader.Reader) interface{} {
+ return
(*analyzeBase.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
+ }, func(data interface{}) int {
+ return
int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
+ })
l.socketDataQueue.Start(ctx, bpfLoader.Linker)
}
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..852154e 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -18,8 +18,6 @@
package btf
import (
- "bytes"
- "encoding/binary"
"errors"
"fmt"
"io"
@@ -44,7 +42,7 @@ import (
const defaultSymbolPrefix = "sys_"
type LinkFunc func(symbol string, prog *ebpf.Program, opts
*link.KprobeOptions) (link.Link, error)
-type RingBufferReader func(data interface{})
+type RingBufferReader func(data *reader.Reader)
var syscallPrefix string
@@ -161,12 +159,12 @@ func (m *Linker) AddTracePoint(sys, name string, p
*ebpf.Program) {
}
}
-func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader,
dataSupplier func() interface{}) {
- m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1,
dataSupplier)
+func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader) {
+ m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1)
}
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer,
- parallels int, dataSupplier func() interface{}) {
+ parallels int) {
rd, err := newQueueReader(emap, perCPUBuffer)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring
buffer error: %v", err))
@@ -179,11 +177,11 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap
*ebpf.Map, bufReader RingBuff
m.closers = append(m.closers, rd)
for i := 0; i < parallels; i++ {
- m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+ m.asyncReadEvent(rd, emap, bufReader)
}
}
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, bufReader
RingBufferReader) {
go func() {
for {
sample, err := rd.Read()
@@ -198,22 +196,11 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap
*ebpf.Map, dataSupplier fun
continue
}
- data := dataSupplier()
- if r, ok := data.(reader.EventReader); ok {
- sampleReader := reader.NewReader(sample)
- r.ReadFrom(sampleReader)
- if readErr := sampleReader.HasError(); readErr
!= nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
- continue
- }
- } else {
- if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
- continue
- }
+ reader := reader.NewReader(sample)
+ bufReader(reader)
+ if reader.HasError() != nil {
+ log.Warnf("read from %s ringbuffer error: %v",
emap.String(), reader.HasError())
}
-
- bufReader(data)
}
}()
}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index fdd8754..8f9aa27 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,6 +20,7 @@ package btf
import (
"context"
"fmt"
+ "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"sync"
"time"
@@ -120,7 +121,7 @@ type EventQueue struct {
type mapReceiver struct {
emap *ebpf.Map
perCPUBuffer int
- dataSupplier func() interface{}
+ dataReader func(*reader.Reader) interface{}
router func(data interface{}) int
parallels int
}
@@ -133,12 +134,12 @@ func NewEventQueue(name string, partitionCount,
sizePerPartition int, contextGen
return &EventQueue{name: name, count: partitionCount, partitions:
partitions}
}
-func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataReader func(r *reader.Reader) interface{},
routeGenerator func(data interface{}) int) {
e.receivers = append(e.receivers, &mapReceiver{
emap: emap,
perCPUBuffer: perCPUBufferSize,
- dataSupplier: dataSupplier,
+ dataReader: dataReader,
router: routeGenerator,
parallels: parallels,
})
@@ -166,9 +167,10 @@ func (e *EventQueue) PartitionContexts()
[]PartitionContext {
func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
for _, r := range e.receivers {
func(receiver *mapReceiver) {
- linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(data interface{}) {
+ linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(r *reader.Reader) {
+ data := receiver.dataReader(r)
e.routerTransformer(data, receiver.router)
- }, receiver.perCPUBuffer, r.parallels,
receiver.dataSupplier)
+ }, receiver.perCPUBuffer, r.parallels)
}(r)
}