This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch perf in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit 3e5f3fa9a116e415fc4432e935ce492768e91433 Author: mrproliu <[email protected]> AuthorDate: Wed Dec 25 13:22:34 2024 +0800 Reduce memory using --- pkg/accesslog/collector/protocols/queue.go | 7 ++++++- pkg/accesslog/collector/ztunnel.go | 3 ++- pkg/process/finders/kubernetes/process.go | 17 +++++++++++------ .../analyze/layer7/protocols/http1/reader/reader.go | 16 ++++++++++++++++ .../analyze/layer7/protocols/http1/reader/request.go | 5 ++++- .../layer7/protocols/http1/reader/response.go | 5 ++++- pkg/tools/btf/ebpf.go | 20 +++++++++++--------- 7 files changed, 54 insertions(+), 19 deletions(-) diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index c9789b7..c9c2fdf 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "sort" + "strconv" "sync" "time" @@ -251,7 +252,11 @@ func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, } func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string { - return fmt.Sprintf("%d_%d", conID, ranID) + buf := make([]byte, 0, 42) // 21 + 1 + 21 + buf = strconv.AppendUint(buf, conID, 10) + buf = append(buf, '_') + buf = strconv.AppendUint(buf, ranID, 10) + return string(buf) } func (p *PartitionContext) processEvents() { diff --git a/pkg/accesslog/collector/ztunnel.go b/pkg/accesslog/collector/ztunnel.go index 6b7eeeb..f6a02ae 100644 --- a/pkg/accesslog/collector/ztunnel.go +++ b/pkg/accesslog/collector/ztunnel.go @@ -110,7 +110,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogConte } func (z *ZTunnelCollector) ReadyToFlushConnection(connection *common.ConnectionInfo, _ events.Event) { - if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil { + if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil || + z.ipMappingCache.Len() == 0 { return } key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, int(connection.Socket.SrcPort), diff --git a/pkg/process/finders/kubernetes/process.go b/pkg/process/finders/kubernetes/process.go index 9879258..bf20683 100644 --- a/pkg/process/finders/kubernetes/process.go +++ b/pkg/process/finders/kubernetes/process.go @@ -18,6 +18,8 @@ package kubernetes import ( + "sync" + "github.com/shirou/gopsutil/process" "github.com/apache/skywalking-rover/pkg/process/api" @@ -29,24 +31,23 @@ type Process struct { original *process.Process // process data - pid int32 - cmd string - profiling *profiling.Info - podContainer *PodContainer + pid int32 + cmd string + profilingOnce sync.Once + profiling *profiling.Info + podContainer *PodContainer // entity for the backend entity *api.ProcessEntity } func NewProcess(p *process.Process, cmdline string, pc *PodContainer, entity *api.ProcessEntity) *Process { - stat, _ := base.BuildProfilingStat(p) return &Process{ original: p, pid: p.Pid, cmd: cmdline, podContainer: pc, entity: entity, - profiling: stat, } } @@ -67,6 +68,10 @@ func (p *Process) DetectType() api.ProcessDetectType { } func (p *Process) ProfilingStat() *profiling.Info { + p.profilingOnce.Do(func() { + stat, _ := base.BuildProfilingStat(p.original) + p.profiling = stat + }) return p.profiling } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go index 151e563..38af27d 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go @@ -26,6 +26,7 @@ import ( "net/http" "strconv" "strings" + "sync" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/tools/buffer" @@ -38,6 +39,11 @@ var ( requestMethods = []string{ "GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", "TRACE", "PATCH", } + pooledReader = sync.Pool{ + New: func() any { + return bufio.NewReader(nil) + }, + } ) var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1", "reader") @@ -313,3 +319,13 @@ func (c *charsetReadWrapper) Read(p []byte) (n int, err error) { func (c *charsetReadWrapper) Close() error { return nil } + +func newPooledReaderFromBuffer(b *buffer.Buffer) *bufio.Reader { + reader := pooledReader.Get().(*bufio.Reader) + reader.Reset(b) + return reader +} + +func releasePooledReader(r *bufio.Reader) { + pooledReader.Put(r) +} diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go index f966743..0134d8b 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go @@ -70,7 +70,10 @@ func (r *Request) Original() *http.Request { // nolint func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, enums.ParseResult, error) { - bufReader := bufio.NewReader(buf) + bufReader := newPooledReaderFromBuffer(buf) + defer func() { + releasePooledReader(bufReader) + }() tp := textproto.NewReader(bufReader) req := &http.Request{} result := &Request{original: req, reader: r} diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go index fae907b..3640df1 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go @@ -59,7 +59,10 @@ func (r *Response) Original() *http.Response { } func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response, enums.ParseResult, error) { - bufReader := bufio.NewReader(buf) + bufReader := newPooledReaderFromBuffer(buf) + defer func() { + releasePooledReader(bufReader) + }() tp := textproto.NewReader(bufReader) resp := &http.Response{} result := &Response{original: resp, req: req, reader: r} diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go index 0e3bdb0..fe952bd 100644 --- a/pkg/tools/btf/ebpf.go +++ b/pkg/tools/btf/ebpf.go @@ -52,45 +52,47 @@ func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs interfa func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) *ebpf.CollectionOptions { findBTFOnce.Do(func() { - readSpec, err := getKernelBTFAddress() + readSpec, kernel, err := getKernelBTFAddress() if err != nil { log.Warnf("found BTF failure: %v", err) return } - spec = readSpec + if !kernel { + spec = readSpec + } }) return &ebpf.CollectionOptions{Programs: ebpf.ProgramOptions{KernelTypes: spec}} } // getKernelBTFAddress means get the kernel BTF file path -func getKernelBTFAddress() (spec *btf.Spec, err error) { +func getKernelBTFAddress() (spec *btf.Spec, fromKernel bool, err error) { spec, err = btf.LoadKernelSpec() if err == nil { - return spec, nil + return nil, true, nil } distributeInfo, err := operator.GetDistributionInfo() if err != nil { - return nil, fmt.Errorf("could not load the system distribute info: %v", err) + return nil, false, fmt.Errorf("could not load the system distribute info: %v", err) } uname, err := operator.GetOSUname() if err != nil { - return nil, fmt.Errorf("could not load the uname info: %v", err) + return nil, false, fmt.Errorf("could not load the uname info: %v", err) } path := fmt.Sprintf("files/%s/%s/%s/%s.btf", distributeInfo.Name, distributeInfo.Version, distributeInfo.Architecture, uname.Release) bpfObjBuff, err := asset(path) if err != nil { - return nil, fmt.Errorf("could not found customized BTF file: %s", path) + return nil, false, fmt.Errorf("could not found customized BTF file: %s", path) } spec, err = btf.LoadSpecFromReader(bytes.NewReader(bpfObjBuff)) if err != nil { - return nil, fmt.Errorf("could not load customized BTF file: %s", path) + return nil, false, fmt.Errorf("could not load customized BTF file: %s", path) } - return spec, nil + return spec, false, nil } func asset(file string) ([]byte, error) {
