This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch perf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit 3e5f3fa9a116e415fc4432e935ce492768e91433
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 13:22:34 2024 +0800

    Reduce memory using
---
 pkg/accesslog/collector/protocols/queue.go           |  7 ++++++-
 pkg/accesslog/collector/ztunnel.go                   |  3 ++-
 pkg/process/finders/kubernetes/process.go            | 17 +++++++++++------
 .../analyze/layer7/protocols/http1/reader/reader.go  | 16 ++++++++++++++++
 .../analyze/layer7/protocols/http1/reader/request.go |  5 ++++-
 .../layer7/protocols/http1/reader/response.go        |  5 ++++-
 pkg/tools/btf/ebpf.go                                | 20 +++++++++++---------
 7 files changed, 54 insertions(+), 19 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index c9789b7..c9c2fdf 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -23,6 +23,7 @@ import (
        "fmt"
        "os"
        "sort"
+       "strconv"
        "sync"
        "time"
 
@@ -251,7 +252,11 @@ func (p *PartitionContext) 
getConnectionContext(connectionID, randomID uint64,
 }
 
 func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string {
-       return fmt.Sprintf("%d_%d", conID, ranID)
+       buf := make([]byte, 0, 42) // 21 + 1 + 21
+       buf = strconv.AppendUint(buf, conID, 10)
+       buf = append(buf, '_')
+       buf = strconv.AppendUint(buf, ranID, 10)
+       return string(buf)
 }
 
 func (p *PartitionContext) processEvents() {
diff --git a/pkg/accesslog/collector/ztunnel.go 
b/pkg/accesslog/collector/ztunnel.go
index 6b7eeeb..f6a02ae 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -110,7 +110,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
 }
 
 func (z *ZTunnelCollector) ReadyToFlushConnection(connection 
*common.ConnectionInfo, _ events.Event) {
-       if connection == nil || connection.Socket == nil || 
connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil {
+       if connection == nil || connection.Socket == nil || 
connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil ||
+               z.ipMappingCache.Len() == 0 {
                return
        }
        key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, 
int(connection.Socket.SrcPort),
diff --git a/pkg/process/finders/kubernetes/process.go 
b/pkg/process/finders/kubernetes/process.go
index 9879258..bf20683 100644
--- a/pkg/process/finders/kubernetes/process.go
+++ b/pkg/process/finders/kubernetes/process.go
@@ -18,6 +18,8 @@
 package kubernetes
 
 import (
+       "sync"
+
        "github.com/shirou/gopsutil/process"
 
        "github.com/apache/skywalking-rover/pkg/process/api"
@@ -29,24 +31,23 @@ type Process struct {
        original *process.Process
 
        // process data
-       pid          int32
-       cmd          string
-       profiling    *profiling.Info
-       podContainer *PodContainer
+       pid           int32
+       cmd           string
+       profilingOnce sync.Once
+       profiling     *profiling.Info
+       podContainer  *PodContainer
 
        // entity for the backend
        entity *api.ProcessEntity
 }
 
 func NewProcess(p *process.Process, cmdline string, pc *PodContainer, entity 
*api.ProcessEntity) *Process {
-       stat, _ := base.BuildProfilingStat(p)
        return &Process{
                original:     p,
                pid:          p.Pid,
                cmd:          cmdline,
                podContainer: pc,
                entity:       entity,
-               profiling:    stat,
        }
 }
 
@@ -67,6 +68,10 @@ func (p *Process) DetectType() api.ProcessDetectType {
 }
 
 func (p *Process) ProfilingStat() *profiling.Info {
+       p.profilingOnce.Do(func() {
+               stat, _ := base.BuildProfilingStat(p.original)
+               p.profiling = stat
+       })
        return p.profiling
 }
 
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 151e563..38af27d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -26,6 +26,7 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync"
 
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/tools/buffer"
@@ -38,6 +39,11 @@ var (
        requestMethods = []string{
                "GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", 
"TRACE", "PATCH",
        }
+       pooledReader = sync.Pool{
+               New: func() any {
+                       return bufio.NewReader(nil)
+               },
+       }
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", 
"protocols", "http1", "reader")
@@ -313,3 +319,13 @@ func (c *charsetReadWrapper) Read(p []byte) (n int, err 
error) {
 func (c *charsetReadWrapper) Close() error {
        return nil
 }
+
+func newPooledReaderFromBuffer(b *buffer.Buffer) *bufio.Reader {
+       reader := pooledReader.Get().(*bufio.Reader)
+       reader.Reset(b)
+       return reader
+}
+
+func releasePooledReader(r *bufio.Reader) {
+       pooledReader.Put(r)
+}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index f966743..0134d8b 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -70,7 +70,10 @@ func (r *Request) Original() *http.Request {
 
 // nolint
 func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, 
enums.ParseResult, error) {
-       bufReader := bufio.NewReader(buf)
+       bufReader := newPooledReaderFromBuffer(buf)
+       defer func() {
+               releasePooledReader(bufReader)
+       }()
        tp := textproto.NewReader(bufReader)
        req := &http.Request{}
        result := &Request{original: req, reader: r}
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index fae907b..3640df1 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -59,7 +59,10 @@ func (r *Response) Original() *http.Response {
 }
 
 func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) 
(*Response, enums.ParseResult, error) {
-       bufReader := bufio.NewReader(buf)
+       bufReader := newPooledReaderFromBuffer(buf)
+       defer func() {
+               releasePooledReader(bufReader)
+       }()
        tp := textproto.NewReader(bufReader)
        resp := &http.Response{}
        result := &Response{original: resp, req: req, reader: r}
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 0e3bdb0..fe952bd 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -52,45 +52,47 @@ func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, 
error), objs interfa
 
 func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) 
*ebpf.CollectionOptions {
        findBTFOnce.Do(func() {
-               readSpec, err := getKernelBTFAddress()
+               readSpec, kernel, err := getKernelBTFAddress()
                if err != nil {
                        log.Warnf("found BTF failure: %v", err)
                        return
                }
 
-               spec = readSpec
+               if !kernel {
+                       spec = readSpec
+               }
        })
 
        return &ebpf.CollectionOptions{Programs: 
ebpf.ProgramOptions{KernelTypes: spec}}
 }
 
 // getKernelBTFAddress means get the kernel BTF file path
-func getKernelBTFAddress() (spec *btf.Spec, err error) {
+func getKernelBTFAddress() (spec *btf.Spec, fromKernel bool, err error) {
        spec, err = btf.LoadKernelSpec()
        if err == nil {
-               return spec, nil
+               return nil, true, nil
        }
 
        distributeInfo, err := operator.GetDistributionInfo()
        if err != nil {
-               return nil, fmt.Errorf("could not load the system distribute 
info: %v", err)
+               return nil, false, fmt.Errorf("could not load the system 
distribute info: %v", err)
        }
        uname, err := operator.GetOSUname()
        if err != nil {
-               return nil, fmt.Errorf("could not load the uname info: %v", err)
+               return nil, false, fmt.Errorf("could not load the uname info: 
%v", err)
        }
 
        path := fmt.Sprintf("files/%s/%s/%s/%s.btf", distributeInfo.Name, 
distributeInfo.Version,
                distributeInfo.Architecture, uname.Release)
        bpfObjBuff, err := asset(path)
        if err != nil {
-               return nil, fmt.Errorf("could not found customized BTF file: 
%s", path)
+               return nil, false, fmt.Errorf("could not found customized BTF 
file: %s", path)
        }
        spec, err = btf.LoadSpecFromReader(bytes.NewReader(bpfObjBuff))
        if err != nil {
-               return nil, fmt.Errorf("could not load customized BTF file: 
%s", path)
+               return nil, false, fmt.Errorf("could not load customized BTF 
file: %s", path)
        }
-       return spec, nil
+       return spec, false, nil
 }
 
 func asset(file string) ([]byte, error) {

Reply via email to