This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 31659c5 Reduce memory and memory using in the access log module (#172)
31659c5 is described below
commit 31659c52985cac9de7541c8978e3ca2a13bb0c74
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 20:19:34 2024 +0800
Reduce memory and memory using in the access log module (#172)
---
CHANGES.md | 1 +
pkg/accesslog/collector/connection.go | 8 ++++----
pkg/accesslog/collector/protocols/queue.go | 15 ++++++++++-----
pkg/accesslog/collector/ztunnel.go | 3 ++-
pkg/process/finders/kubernetes/process.go | 17 +++++++++++------
pkg/process/finders/kubernetes/registry.go | 12 ++++++++----
.../task/network/analyze/layer7/events.go | 8 ++++----
.../layer7/protocols/http1/reader/reader.go | 16 ++++++++++++++++
.../layer7/protocols/http1/reader/request.go | 5 ++++-
.../layer7/protocols/http1/reader/response.go | 5 ++++-
pkg/tools/btf/ebpf.go | 22 ++++++++++++----------
pkg/tools/btf/queue.go | 16 +++++-----------
12 files changed, 81 insertions(+), 47 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8cb3d99..3d54791 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,7 @@ Release Notes.
* Add gRPC sender to sending the access log to the backend.
* Add warning log when the event queue almost full in the access log module.
* Reduce unessential `conntrack` query when detect new connection.
+* Reduce CPU and memory usage in the access log module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index d861447..4fc6823 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -84,13 +84,13 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
return &events.SocketConnectEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
+ }, func(data interface{}) int {
+ return int(data.(*events.SocketConnectEvent).ConID)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func()
interface{} {
return &events.SocketCloseEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketCloseEvent).ConnectionID)
+ }, func(data interface{}) int {
+ return int(data.(*events.SocketCloseEvent).ConnectionID)
})
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index c9789b7..a4577f7 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -23,6 +23,7 @@ import (
"fmt"
"os"
"sort"
+ "strconv"
"sync"
"time"
@@ -96,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return q.detailSupplier()
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(events.SocketDetail).GetConnectionID())
+ }, func(data interface{}) int {
+ return int(data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return &events.SocketDataUploadEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketDataUploadEvent).ConnectionID)
+ }, func(data interface{}) int {
+ return
int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
q.eventQueue.Start(ctx, q.context.BPF.Linker)
@@ -251,7 +252,11 @@ func (p *PartitionContext)
getConnectionContext(connectionID, randomID uint64,
}
func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string {
- return fmt.Sprintf("%d_%d", conID, ranID)
+ buf := make([]byte, 0, 42) // 21 + 1 + 21
+ buf = strconv.AppendUint(buf, conID, 10)
+ buf = append(buf, '_')
+ buf = strconv.AppendUint(buf, ranID, 10)
+ return string(buf)
}
func (p *PartitionContext) processEvents() {
diff --git a/pkg/accesslog/collector/ztunnel.go
b/pkg/accesslog/collector/ztunnel.go
index 6b7eeeb..f6a02ae 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -110,7 +110,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx
*common.AccessLogConte
}
func (z *ZTunnelCollector) ReadyToFlushConnection(connection
*common.ConnectionInfo, _ events.Event) {
- if connection == nil || connection.Socket == nil ||
connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil {
+ if connection == nil || connection.Socket == nil ||
connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil ||
+ z.ipMappingCache.Len() == 0 {
return
}
key := z.buildIPMappingCacheKey(connection.Socket.SrcIP,
int(connection.Socket.SrcPort),
diff --git a/pkg/process/finders/kubernetes/process.go
b/pkg/process/finders/kubernetes/process.go
index 9879258..bf20683 100644
--- a/pkg/process/finders/kubernetes/process.go
+++ b/pkg/process/finders/kubernetes/process.go
@@ -18,6 +18,8 @@
package kubernetes
import (
+ "sync"
+
"github.com/shirou/gopsutil/process"
"github.com/apache/skywalking-rover/pkg/process/api"
@@ -29,24 +31,23 @@ type Process struct {
original *process.Process
// process data
- pid int32
- cmd string
- profiling *profiling.Info
- podContainer *PodContainer
+ pid int32
+ cmd string
+ profilingOnce sync.Once
+ profiling *profiling.Info
+ podContainer *PodContainer
// entity for the backend
entity *api.ProcessEntity
}
func NewProcess(p *process.Process, cmdline string, pc *PodContainer, entity
*api.ProcessEntity) *Process {
- stat, _ := base.BuildProfilingStat(p)
return &Process{
original: p,
pid: p.Pid,
cmd: cmdline,
podContainer: pc,
entity: entity,
- profiling: stat,
}
}
@@ -67,6 +68,10 @@ func (p *Process) DetectType() api.ProcessDetectType {
}
func (p *Process) ProfilingStat() *profiling.Info {
+ p.profilingOnce.Do(func() {
+ stat, _ := base.BuildProfilingStat(p.original)
+ p.profiling = stat
+ })
return p.profiling
}
diff --git a/pkg/process/finders/kubernetes/registry.go
b/pkg/process/finders/kubernetes/registry.go
index 52169aa..e2d464d 100644
--- a/pkg/process/finders/kubernetes/registry.go
+++ b/pkg/process/finders/kubernetes/registry.go
@@ -18,6 +18,7 @@
package kubernetes
import (
+ "reflect"
"time"
"k8s.io/apimachinery/pkg/labels"
@@ -133,14 +134,17 @@ func chooseServiceName(a, b string) string {
return b
}
-func (r *Registry) OnAdd(_ interface{}) {
+func (r *Registry) OnAdd(d interface{}) {
r.recomposePodServiceName()
}
-func (r *Registry) OnUpdate(_, _ interface{}) {
- r.recomposePodServiceName()
+func (r *Registry) OnUpdate(d, u interface{}) {
+ same := reflect.DeepEqual(d, u)
+ if !same {
+ r.recomposePodServiceName()
+ }
}
-func (r *Registry) OnDelete(_ interface{}) {
+func (r *Registry) OnDelete(d interface{}) {
r.recomposePodServiceName()
}
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 511a088..1c01084 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -38,15 +38,15 @@ func (l *Listener) startSocketData(ctx context.Context,
bpfLoader *bpf.Loader) {
// socket buffer data
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDataUploadEvent{}
- }, func(data interface{}) string {
- return
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
+ }, func(data interface{}) int {
+ return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
})
// socket detail
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDetailEvent{}
- }, func(data interface{}) string {
- return
data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
+ }, func(data interface{}) int {
+ return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
})
l.socketDataQueue.Start(ctx, bpfLoader.Linker)
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 151e563..38af27d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -26,6 +26,7 @@ import (
"net/http"
"strconv"
"strings"
+ "sync"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
@@ -38,6 +39,11 @@ var (
requestMethods = []string{
"GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT",
"TRACE", "PATCH",
}
+ pooledReader = sync.Pool{
+ New: func() any {
+ return bufio.NewReader(nil)
+ },
+ }
)
var log = logger.GetLogger("profiling", "task", "network", "layer7",
"protocols", "http1", "reader")
@@ -313,3 +319,13 @@ func (c *charsetReadWrapper) Read(p []byte) (n int, err
error) {
func (c *charsetReadWrapper) Close() error {
return nil
}
+
+func newPooledReaderFromBuffer(b *buffer.Buffer) *bufio.Reader {
+ reader := pooledReader.Get().(*bufio.Reader)
+ reader.Reset(b)
+ return reader
+}
+
+func releasePooledReader(r *bufio.Reader) {
+ pooledReader.Put(r)
+}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index f966743..0134d8b 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -70,7 +70,10 @@ func (r *Request) Original() *http.Request {
// nolint
func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request,
enums.ParseResult, error) {
- bufReader := bufio.NewReader(buf)
+ bufReader := newPooledReaderFromBuffer(buf)
+ defer func() {
+ releasePooledReader(bufReader)
+ }()
tp := textproto.NewReader(bufReader)
req := &http.Request{}
result := &Request{original: req, reader: r}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index fae907b..3640df1 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -59,7 +59,10 @@ func (r *Response) Original() *http.Response {
}
func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool)
(*Response, enums.ParseResult, error) {
- bufReader := bufio.NewReader(buf)
+ bufReader := newPooledReaderFromBuffer(buf)
+ defer func() {
+ releasePooledReader(bufReader)
+ }()
tp := textproto.NewReader(bufReader)
resp := &http.Response{}
result := &Response{original: resp, req: req, reader: r}
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 0e3bdb0..b3552a8 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -52,45 +52,47 @@ func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec,
error), objs interfa
func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec)
*ebpf.CollectionOptions {
findBTFOnce.Do(func() {
- readSpec, err := getKernelBTFAddress()
+ readSpec, kernel, err := getKernelBTFAddress()
if err != nil {
log.Warnf("found BTF failure: %v", err)
return
}
- spec = readSpec
+ if !kernel {
+ spec = readSpec
+ }
})
return &ebpf.CollectionOptions{Programs:
ebpf.ProgramOptions{KernelTypes: spec}}
}
// getKernelBTFAddress means get the kernel BTF file path
-func getKernelBTFAddress() (spec *btf.Spec, err error) {
- spec, err = btf.LoadKernelSpec()
+func getKernelBTFAddress() (spec *btf.Spec, fromKernel bool, err error) {
+ _, err = btf.LoadKernelSpec()
if err == nil {
- return spec, nil
+ return nil, true, nil
}
distributeInfo, err := operator.GetDistributionInfo()
if err != nil {
- return nil, fmt.Errorf("could not load the system distribute
info: %v", err)
+ return nil, false, fmt.Errorf("could not load the system
distribute info: %v", err)
}
uname, err := operator.GetOSUname()
if err != nil {
- return nil, fmt.Errorf("could not load the uname info: %v", err)
+ return nil, false, fmt.Errorf("could not load the uname info:
%v", err)
}
path := fmt.Sprintf("files/%s/%s/%s/%s.btf", distributeInfo.Name,
distributeInfo.Version,
distributeInfo.Architecture, uname.Release)
bpfObjBuff, err := asset(path)
if err != nil {
- return nil, fmt.Errorf("could not found customized BTF file:
%s", path)
+ return nil, false, fmt.Errorf("could not found customized BTF
file: %s", path)
}
spec, err = btf.LoadSpecFromReader(bytes.NewReader(bpfObjBuff))
if err != nil {
- return nil, fmt.Errorf("could not load customized BTF file:
%s", path)
+ return nil, false, fmt.Errorf("could not load customized BTF
file: %s", path)
}
- return spec, nil
+ return spec, false, nil
}
func asset(file string) ([]byte, error) {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index dc78465..fdd8754 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,7 +20,6 @@ package btf
import (
"context"
"fmt"
- "hash/fnv"
"sync"
"time"
@@ -122,7 +121,7 @@ type mapReceiver struct {
emap *ebpf.Map
perCPUBuffer int
dataSupplier func() interface{}
- router func(data interface{}) string
+ router func(data interface{}) int
parallels int
}
@@ -135,7 +134,7 @@ func NewEventQueue(name string, partitionCount,
sizePerPartition int, contextGen
}
func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
- routeGenerator func(data interface{}) string) {
+ routeGenerator func(data interface{}) int) {
e.receivers = append(e.receivers, &mapReceiver{
emap: emap,
perCPUBuffer: perCPUBufferSize,
@@ -151,14 +150,9 @@ func (e *EventQueue) Start(ctx context.Context, linker
*Linker) {
})
}
-func (e *EventQueue) Push(key string, data interface{}) {
- // calculate hash of key
- h := fnv.New32a()
- h.Write([]byte(key))
- sum32 := int(h.Sum32())
-
+func (e *EventQueue) Push(key int, data interface{}) {
// append data
- e.partitions[sum32%e.count].channel <- data
+ e.partitions[key%e.count].channel <- data
}
func (e *EventQueue) PartitionContexts() []PartitionContext {
@@ -217,7 +211,7 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
}()
}
-func (e *EventQueue) routerTransformer(data interface{}, routeGenerator
func(data interface{}) string) {
+func (e *EventQueue) routerTransformer(data interface{}, routeGenerator
func(data interface{}) int) {
key := routeGenerator(data)
e.Push(key, data)
}