This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 31659c5  Reduce memory and memory using in the access log module (#172)
31659c5 is described below

commit 31659c52985cac9de7541c8978e3ca2a13bb0c74
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 20:19:34 2024 +0800

    Reduce memory and memory using in the access log module (#172)
---
 CHANGES.md                                         |  1 +
 pkg/accesslog/collector/connection.go              |  8 ++++----
 pkg/accesslog/collector/protocols/queue.go         | 15 ++++++++++-----
 pkg/accesslog/collector/ztunnel.go                 |  3 ++-
 pkg/process/finders/kubernetes/process.go          | 17 +++++++++++------
 pkg/process/finders/kubernetes/registry.go         | 12 ++++++++----
 .../task/network/analyze/layer7/events.go          |  8 ++++----
 .../layer7/protocols/http1/reader/reader.go        | 16 ++++++++++++++++
 .../layer7/protocols/http1/reader/request.go       |  5 ++++-
 .../layer7/protocols/http1/reader/response.go      |  5 ++++-
 pkg/tools/btf/ebpf.go                              | 22 ++++++++++++----------
 pkg/tools/btf/queue.go                             | 16 +++++-----------
 12 files changed, 81 insertions(+), 47 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8cb3d99..3d54791 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,7 @@ Release Notes.
 * Add gRPC sender to sending the access log to the backend.
 * Add warning log when the event queue almost full in the access log module.
 * Reduce unessential `conntrack` query when detect new connection.
+* Reduce CPU and memory usage in the access log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index d861447..4fc6823 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -84,13 +84,13 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
                ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
                        return &events.SocketConnectEvent{}
-               }, func(data interface{}) string {
-                       return fmt.Sprintf("%d", 
data.(*events.SocketConnectEvent).ConID)
+               }, func(data interface{}) int {
+                       return int(data.(*events.SocketConnectEvent).ConID)
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() 
interface{} {
                return &events.SocketCloseEvent{}
-       }, func(data interface{}) string {
-               return fmt.Sprintf("%d", 
data.(*events.SocketCloseEvent).ConnectionID)
+       }, func(data interface{}) int {
+               return int(data.(*events.SocketCloseEvent).ConnectionID)
        })
        c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
 
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index c9789b7..a4577f7 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -23,6 +23,7 @@ import (
        "fmt"
        "os"
        "sort"
+       "strconv"
        "sync"
        "time"
 
@@ -96,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return q.detailSupplier()
-               }, func(data interface{}) string {
-                       return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
+               }, func(data interface{}) int {
+                       return int(data.(events.SocketDetail).GetConnectionID())
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return &events.SocketDataUploadEvent{}
-               }, func(data interface{}) string {
-                       return fmt.Sprintf("%d", 
data.(*events.SocketDataUploadEvent).ConnectionID)
+               }, func(data interface{}) int {
+                       return 
int(data.(*events.SocketDataUploadEvent).ConnectionID)
                })
 
        q.eventQueue.Start(ctx, q.context.BPF.Linker)
@@ -251,7 +252,11 @@ func (p *PartitionContext) 
getConnectionContext(connectionID, randomID uint64,
 }
 
 func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string {
-       return fmt.Sprintf("%d_%d", conID, ranID)
+       buf := make([]byte, 0, 42) // 21 + 1 + 21
+       buf = strconv.AppendUint(buf, conID, 10)
+       buf = append(buf, '_')
+       buf = strconv.AppendUint(buf, ranID, 10)
+       return string(buf)
 }
 
 func (p *PartitionContext) processEvents() {
diff --git a/pkg/accesslog/collector/ztunnel.go 
b/pkg/accesslog/collector/ztunnel.go
index 6b7eeeb..f6a02ae 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -110,7 +110,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
 }
 
 func (z *ZTunnelCollector) ReadyToFlushConnection(connection 
*common.ConnectionInfo, _ events.Event) {
-       if connection == nil || connection.Socket == nil || 
connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil {
+       if connection == nil || connection.Socket == nil || 
connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil ||
+               z.ipMappingCache.Len() == 0 {
                return
        }
        key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, 
int(connection.Socket.SrcPort),
diff --git a/pkg/process/finders/kubernetes/process.go 
b/pkg/process/finders/kubernetes/process.go
index 9879258..bf20683 100644
--- a/pkg/process/finders/kubernetes/process.go
+++ b/pkg/process/finders/kubernetes/process.go
@@ -18,6 +18,8 @@
 package kubernetes
 
 import (
+       "sync"
+
        "github.com/shirou/gopsutil/process"
 
        "github.com/apache/skywalking-rover/pkg/process/api"
@@ -29,24 +31,23 @@ type Process struct {
        original *process.Process
 
        // process data
-       pid          int32
-       cmd          string
-       profiling    *profiling.Info
-       podContainer *PodContainer
+       pid           int32
+       cmd           string
+       profilingOnce sync.Once
+       profiling     *profiling.Info
+       podContainer  *PodContainer
 
        // entity for the backend
        entity *api.ProcessEntity
 }
 
 func NewProcess(p *process.Process, cmdline string, pc *PodContainer, entity 
*api.ProcessEntity) *Process {
-       stat, _ := base.BuildProfilingStat(p)
        return &Process{
                original:     p,
                pid:          p.Pid,
                cmd:          cmdline,
                podContainer: pc,
                entity:       entity,
-               profiling:    stat,
        }
 }
 
@@ -67,6 +68,10 @@ func (p *Process) DetectType() api.ProcessDetectType {
 }
 
 func (p *Process) ProfilingStat() *profiling.Info {
+       p.profilingOnce.Do(func() {
+               stat, _ := base.BuildProfilingStat(p.original)
+               p.profiling = stat
+       })
        return p.profiling
 }
 
diff --git a/pkg/process/finders/kubernetes/registry.go 
b/pkg/process/finders/kubernetes/registry.go
index 52169aa..e2d464d 100644
--- a/pkg/process/finders/kubernetes/registry.go
+++ b/pkg/process/finders/kubernetes/registry.go
@@ -18,6 +18,7 @@
 package kubernetes
 
 import (
+       "reflect"
        "time"
 
        "k8s.io/apimachinery/pkg/labels"
@@ -133,14 +134,17 @@ func chooseServiceName(a, b string) string {
        return b
 }
 
-func (r *Registry) OnAdd(_ interface{}) {
+func (r *Registry) OnAdd(d interface{}) {
        r.recomposePodServiceName()
 }
 
-func (r *Registry) OnUpdate(_, _ interface{}) {
-       r.recomposePodServiceName()
+func (r *Registry) OnUpdate(d, u interface{}) {
+       same := reflect.DeepEqual(d, u)
+       if !same {
+               r.recomposePodServiceName()
+       }
 }
 
-func (r *Registry) OnDelete(_ interface{}) {
+func (r *Registry) OnDelete(d interface{}) {
        r.recomposePodServiceName()
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 511a088..1c01084 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -38,15 +38,15 @@ func (l *Listener) startSocketData(ctx context.Context, 
bpfLoader *bpf.Loader) {
        // socket buffer data
        l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDataUploadEvent{}
-       }, func(data interface{}) string {
-               return 
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
+       }, func(data interface{}) int {
+               return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
        })
 
        // socket detail
        l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDetailEvent{}
-       }, func(data interface{}) string {
-               return 
data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
+       }, func(data interface{}) int {
+               return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
        })
 
        l.socketDataQueue.Start(ctx, bpfLoader.Linker)
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 151e563..38af27d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -26,6 +26,7 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync"
 
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/tools/buffer"
@@ -38,6 +39,11 @@ var (
        requestMethods = []string{
                "GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", 
"TRACE", "PATCH",
        }
+       pooledReader = sync.Pool{
+               New: func() any {
+                       return bufio.NewReader(nil)
+               },
+       }
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", 
"protocols", "http1", "reader")
@@ -313,3 +319,13 @@ func (c *charsetReadWrapper) Read(p []byte) (n int, err 
error) {
 func (c *charsetReadWrapper) Close() error {
        return nil
 }
+
+func newPooledReaderFromBuffer(b *buffer.Buffer) *bufio.Reader {
+       reader := pooledReader.Get().(*bufio.Reader)
+       reader.Reset(b)
+       return reader
+}
+
+func releasePooledReader(r *bufio.Reader) {
+       pooledReader.Put(r)
+}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index f966743..0134d8b 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -70,7 +70,10 @@ func (r *Request) Original() *http.Request {
 
 // nolint
 func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, 
enums.ParseResult, error) {
-       bufReader := bufio.NewReader(buf)
+       bufReader := newPooledReaderFromBuffer(buf)
+       defer func() {
+               releasePooledReader(bufReader)
+       }()
        tp := textproto.NewReader(bufReader)
        req := &http.Request{}
        result := &Request{original: req, reader: r}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index fae907b..3640df1 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -59,7 +59,10 @@ func (r *Response) Original() *http.Response {
 }
 
 func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) 
(*Response, enums.ParseResult, error) {
-       bufReader := bufio.NewReader(buf)
+       bufReader := newPooledReaderFromBuffer(buf)
+       defer func() {
+               releasePooledReader(bufReader)
+       }()
        tp := textproto.NewReader(bufReader)
        resp := &http.Response{}
        result := &Response{original: resp, req: req, reader: r}
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 0e3bdb0..b3552a8 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -52,45 +52,47 @@ func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, 
error), objs interfa
 
 func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) 
*ebpf.CollectionOptions {
        findBTFOnce.Do(func() {
-               readSpec, err := getKernelBTFAddress()
+               readSpec, kernel, err := getKernelBTFAddress()
                if err != nil {
                        log.Warnf("found BTF failure: %v", err)
                        return
                }
 
-               spec = readSpec
+               if !kernel {
+                       spec = readSpec
+               }
        })
 
        return &ebpf.CollectionOptions{Programs: 
ebpf.ProgramOptions{KernelTypes: spec}}
 }
 
 // getKernelBTFAddress means get the kernel BTF file path
-func getKernelBTFAddress() (spec *btf.Spec, err error) {
-       spec, err = btf.LoadKernelSpec()
+func getKernelBTFAddress() (spec *btf.Spec, fromKernel bool, err error) {
+       _, err = btf.LoadKernelSpec()
        if err == nil {
-               return spec, nil
+               return nil, true, nil
        }
 
        distributeInfo, err := operator.GetDistributionInfo()
        if err != nil {
-               return nil, fmt.Errorf("could not load the system distribute 
info: %v", err)
+               return nil, false, fmt.Errorf("could not load the system 
distribute info: %v", err)
        }
        uname, err := operator.GetOSUname()
        if err != nil {
-               return nil, fmt.Errorf("could not load the uname info: %v", err)
+               return nil, false, fmt.Errorf("could not load the uname info: 
%v", err)
        }
 
        path := fmt.Sprintf("files/%s/%s/%s/%s.btf", distributeInfo.Name, 
distributeInfo.Version,
                distributeInfo.Architecture, uname.Release)
        bpfObjBuff, err := asset(path)
        if err != nil {
-               return nil, fmt.Errorf("could not found customized BTF file: 
%s", path)
+               return nil, false, fmt.Errorf("could not found customized BTF 
file: %s", path)
        }
        spec, err = btf.LoadSpecFromReader(bytes.NewReader(bpfObjBuff))
        if err != nil {
-               return nil, fmt.Errorf("could not load customized BTF file: 
%s", path)
+               return nil, false, fmt.Errorf("could not load customized BTF 
file: %s", path)
        }
-       return spec, nil
+       return spec, false, nil
 }
 
 func asset(file string) ([]byte, error) {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index dc78465..fdd8754 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -20,7 +20,6 @@ package btf
 import (
        "context"
        "fmt"
-       "hash/fnv"
        "sync"
        "time"
 
@@ -122,7 +121,7 @@ type mapReceiver struct {
        emap         *ebpf.Map
        perCPUBuffer int
        dataSupplier func() interface{}
-       router       func(data interface{}) string
+       router       func(data interface{}) int
        parallels    int
 }
 
@@ -135,7 +134,7 @@ func NewEventQueue(name string, partitionCount, 
sizePerPartition int, contextGen
 }
 
 func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
-       routeGenerator func(data interface{}) string) {
+       routeGenerator func(data interface{}) int) {
        e.receivers = append(e.receivers, &mapReceiver{
                emap:         emap,
                perCPUBuffer: perCPUBufferSize,
@@ -151,14 +150,9 @@ func (e *EventQueue) Start(ctx context.Context, linker 
*Linker) {
        })
 }
 
-func (e *EventQueue) Push(key string, data interface{}) {
-       // calculate hash of key
-       h := fnv.New32a()
-       h.Write([]byte(key))
-       sum32 := int(h.Sum32())
-
+func (e *EventQueue) Push(key int, data interface{}) {
        // append data
-       e.partitions[sum32%e.count].channel <- data
+       e.partitions[key%e.count].channel <- data
 }
 
 func (e *EventQueue) PartitionContexts() []PartitionContext {
@@ -217,7 +211,7 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
        }()
 }
 
-func (e *EventQueue) routerTransformer(data interface{}, routeGenerator 
func(data interface{}) string) {
+func (e *EventQueue) routerTransformer(data interface{}, routeGenerator 
func(data interface{}) int) {
        key := routeGenerator(data)
        e.Push(key, data)
 }

Reply via email to