This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 350767c rollback
350767c is described below
commit 350767cbc804ad03c5ed9a97dfaab8c651bd3f89
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 20:39:04 2024 +0800
rollback
---
pkg/accesslog/collector/connection.go | 17 +++++------
pkg/accesslog/collector/process.go | 7 +++--
pkg/accesslog/collector/protocols/queue.go | 20 ++++++-------
pkg/accesslog/collector/ztunnel.go | 8 +++---
.../continuous/checker/bpf/network/network.go | 2 +-
.../continuous/checker/bpf/network/reader.go | 10 ++++---
.../task/network/analyze/layer7/events.go | 24 +++++++---------
pkg/tools/btf/linker.go | 33 +++++++++++++++-------
pkg/tools/btf/queue.go | 12 ++++----
9 files changed, 69 insertions(+), 64 deletions(-)
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index e7bcdf7..7954e2d 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -22,12 +22,10 @@ import (
"context"
"encoding/binary"
"fmt"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"net"
"os"
"sync"
"time"
- "unsafe"
"github.com/docker/go-units"
@@ -87,17 +85,16 @@ func (c *ConnectCollector) Start(m *module.Manager, ctx
*common.AccessLogContext
return NewConnectionPartitionContext(ctx,
m.FindModule(process.ModuleName).(process.K8sOperator))
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
- ctx.Config.ConnectionAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
- return
(*events.SocketConnectEvent)(unsafe.Pointer(&r.Sample[0]))
+ ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
+ return &events.SocketConnectEvent{}
}, func(data interface{}) int {
return int(data.(*events.SocketConnectEvent).ConID)
})
- c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize),
- ctx.Config.ConnectionAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
- return
(*events.SocketCloseEvent)(unsafe.Pointer(&r.Sample[0]))
- }, func(data interface{}) int {
- return int(data.(*events.SocketCloseEvent).ConnectionID)
- })
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func()
interface{} {
+ return &events.SocketCloseEvent{}
+ }, func(data interface{}) int {
+ return int(data.(*events.SocketCloseEvent).ConnectionID)
+ })
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect",
ctx.BPF.TracepointEnterConnect)
diff --git a/pkg/accesslog/collector/process.go
b/pkg/accesslog/collector/process.go
index 2e5e3de..e4dbdce 100644
--- a/pkg/accesslog/collector/process.go
+++ b/pkg/accesslog/collector/process.go
@@ -20,7 +20,6 @@ package collector
import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/module"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
)
var processCollectInstance = NewProcessCollector()
@@ -37,8 +36,10 @@ func (p *ProcessCollector) Start(_ *module.Manager, context
*common.AccessLogCon
// monitor process been execute
context.BPF.AddTracePoint("sched", "sched_process_fork",
context.BPF.TracepointSchedProcessFork)
- context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(r
*reader.Reader) {
-
context.ConnectionMgr.OnNewProcessExecuting(int32(r.ReadUint32()))
+ context.BPF.ReadEventAsync(context.BPF.ProcessExecuteQueue, func(data
interface{}) {
+
context.ConnectionMgr.OnNewProcessExecuting(int32(data.(*ProcessExecuteEvent).PID))
+ }, func() interface{} {
+ return &ProcessExecuteEvent{}
})
return nil
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 929d0ef..c3ccd4c 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -21,13 +21,11 @@ import (
"context"
"errors"
"fmt"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"os"
"sort"
"strconv"
"sync"
"time"
- "unsafe"
"github.com/cilium/ebpf"
@@ -53,7 +51,7 @@ type AnalyzeQueue struct {
eventQueue *btf.EventQueue
perCPUBuffer int64
- detailReader func(*reader.Reader) events.SocketDetail
+ detailSupplier func() events.SocketDetail
supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
}
@@ -78,8 +76,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
return &AnalyzeQueue{
context: ctx,
perCPUBuffer: perCPUBufferSize,
- detailReader: func(r *reader.Reader) events.SocketDetail {
- return
(*events.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
+ detailSupplier: func() events.SocketDetail {
+ return &events.SocketDetailEvent{}
},
supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol
{
return []Protocol{
@@ -97,14 +95,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue,
int(q.perCPUBuffer),
- q.context.Config.ProtocolAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
- return q.detailReader(r)
+ q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
+ return q.detailSupplier()
}, func(data interface{}) int {
return int(data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
- q.context.Config.ProtocolAnalyze.ParseParallels, func(r
*reader.Reader) interface{} {
- return
(*events.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
+ q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
+ return &events.SocketDataUploadEvent{}
}, func(data interface{}) int {
return
int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
@@ -112,8 +110,8 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.Start(ctx, q.context.BPF.Linker)
}
-func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func(r *reader.Reader)
events.SocketDetail) {
- q.detailReader = supplier
+func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func()
events.SocketDetail) {
+ q.detailSupplier = supplier
}
func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx
*common.AccessLogContext) []Protocol) {
diff --git a/pkg/accesslog/collector/ztunnel.go
b/pkg/accesslog/collector/ztunnel.go
index 99a6637..f6a02ae 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -20,10 +20,8 @@ package collector
import (
"context"
"fmt"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"strings"
"time"
- "unsafe"
"k8s.io/apimachinery/pkg/util/cache"
@@ -76,8 +74,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx
*common.AccessLogConte
return err
}
- ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(r
*reader.Reader) {
- event :=
(*events.ZTunnelSocketMappingEvent)(unsafe.Pointer(&r.Sample[0]))
+ ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue,
func(data interface{}) {
+ event := data.(*events.ZTunnelSocketMappingEvent)
localIP := z.convertBPFIPToString(event.OriginalSrcIP)
localPort := event.OriginalSrcPort
remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
@@ -90,6 +88,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx
*common.AccessLogConte
IP: lbIP,
Port: event.LoadBalancedDestPort,
}, z.ipMappingExpireDuration)
+ }, func() interface{} {
+ return &events.ZTunnelSocketMappingEvent{}
})
go func() {
ticker := time.NewTicker(ZTunnelProcessFinderInterval)
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go
b/pkg/profiling/continuous/checker/bpf/network/network.go
index a062019..a3cb644 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -148,7 +148,7 @@ func startBPFIfNeed() error {
n.ReceiveBufferEvent(event)
}
})
- bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue,
reader.Read, os.Getpagesize()*100, 1)
+ bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue,
reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
if err := bpfLinker.HasError(); err != nil {
_ = bpfLinker.Close()
diff --git a/pkg/profiling/continuous/checker/bpf/network/reader.go
b/pkg/profiling/continuous/checker/bpf/network/reader.go
index 93ebacf..721e17f 100644
--- a/pkg/profiling/continuous/checker/bpf/network/reader.go
+++ b/pkg/profiling/continuous/checker/bpf/network/reader.go
@@ -18,9 +18,7 @@
package network
import (
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/enums"
- "unsafe"
)
type BufferDirection uint8
@@ -57,8 +55,8 @@ func newNetworkBufferReader(notify func(event BufferEvent))
*networkBufferReader
}
}
-func (n *networkBufferReader) Read(data *reader.Reader) {
- buffer := (*networkBufferInBPF)(unsafe.Pointer(&data.Sample[0]))
+func (n *networkBufferReader) Read(data interface{}) {
+ buffer := data.(*networkBufferInBPF)
analyzer := protocolAnalyzers[buffer.Protocol]
if analyzer == nil {
return
@@ -70,6 +68,10 @@ func (n *networkBufferReader) Read(data *reader.Reader) {
}
}
+func (n *networkBufferReader) BufferDataBPFSupplier() interface{} {
+ return &networkBufferInBPF{}
+}
+
type ProtocolAnalyzer interface {
HandleBufferEvent(buffer *networkBufferInBPF) BufferEvent
}
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 6bd7dab..1c01084 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,8 +19,6 @@ package layer7
import (
"context"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
- "unsafe"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
analyzeBase
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
@@ -38,20 +36,18 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
// socket buffer data
- l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer,
- 1, func(r *reader.Reader) interface{} {
- return
(*analyzeBase.SocketDataUploadEvent)(unsafe.Pointer(&r.Sample[0]))
- }, func(data interface{}) int {
- return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
- })
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
+ return &analyzeBase.SocketDataUploadEvent{}
+ }, func(data interface{}) int {
+ return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
+ })
// socket detail
- l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer,
- 1, func(r *reader.Reader) interface{} {
- return
(*analyzeBase.SocketDetailEvent)(unsafe.Pointer(&r.Sample[0]))
- }, func(data interface{}) int {
- return
int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
- })
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
+ return &analyzeBase.SocketDetailEvent{}
+ }, func(data interface{}) int {
+ return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
+ })
l.socketDataQueue.Start(ctx, bpfLoader.Linker)
}
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 852154e..bdbd7a6 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -18,6 +18,8 @@
package btf
import (
+ "bytes"
+ "encoding/binary"
"errors"
"fmt"
"io"
@@ -42,7 +44,7 @@ import (
const defaultSymbolPrefix = "sys_"
type LinkFunc func(symbol string, prog *ebpf.Program, opts
*link.KprobeOptions) (link.Link, error)
-type RingBufferReader func(data *reader.Reader)
+type RingBufferReader func(data interface{})
var syscallPrefix string
@@ -159,12 +161,12 @@ func (m *Linker) AddTracePoint(sys, name string, p
*ebpf.Program) {
}
}
-func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader) {
- m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1)
+func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader,
dataSupplier func() interface{}) {
+ m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1,
dataSupplier)
}
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer,
- parallels int) {
+ parallels int, dataSupplier func() interface{}) {
rd, err := newQueueReader(emap, perCPUBuffer)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring
buffer error: %v", err))
@@ -177,11 +179,11 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap
*ebpf.Map, bufReader RingBuff
m.closers = append(m.closers, rd)
for i := 0; i < parallels; i++ {
- m.asyncReadEvent(rd, emap, bufReader)
+ m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
}
}
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, bufReader
RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
go func() {
for {
sample, err := rd.Read()
@@ -196,11 +198,22 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap
*ebpf.Map, bufReader RingBu
continue
}
- reader := reader.NewReader(sample)
- bufReader(reader)
- if reader.HasError() != nil {
- log.Warnf("read from %s ringbuffer error: %v",
emap.String(), reader.HasError())
+ data := dataSupplier()
+ if r, ok := data.(reader.EventReader); ok {
+ sampleReader := reader.NewReader(sample)
+ r.ReadFrom(sampleReader)
+ if readErr := sampleReader.HasError(); readErr
!= nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ continue
+ }
+ } else {
+ if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ continue
+ }
}
+
+ bufReader(data)
}
}()
}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 8f9aa27..fdd8754 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,7 +20,6 @@ package btf
import (
"context"
"fmt"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"sync"
"time"
@@ -121,7 +120,7 @@ type EventQueue struct {
type mapReceiver struct {
emap *ebpf.Map
perCPUBuffer int
- dataReader func(*reader.Reader) interface{}
+ dataSupplier func() interface{}
router func(data interface{}) int
parallels int
}
@@ -134,12 +133,12 @@ func NewEventQueue(name string, partitionCount,
sizePerPartition int, contextGen
return &EventQueue{name: name, count: partitionCount, partitions:
partitions}
}
-func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataReader func(r *reader.Reader) interface{},
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
routeGenerator func(data interface{}) int) {
e.receivers = append(e.receivers, &mapReceiver{
emap: emap,
perCPUBuffer: perCPUBufferSize,
- dataReader: dataReader,
+ dataSupplier: dataSupplier,
router: routeGenerator,
parallels: parallels,
})
@@ -167,10 +166,9 @@ func (e *EventQueue) PartitionContexts()
[]PartitionContext {
func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
for _, r := range e.receivers {
func(receiver *mapReceiver) {
- linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(r *reader.Reader) {
- data := receiver.dataReader(r)
+ linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(data interface{}) {
e.routerTransformer(data, receiver.router)
- }, receiver.perCPUBuffer, r.parallels)
+ }, receiver.perCPUBuffer, r.parallels,
receiver.dataSupplier)
}(r)
}