This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new d833c09  pooled socket data
d833c09 is described below

commit d833c09d12abe925353caa375080b8a5f7960e97
Author: mrproliu <[email protected]>
AuthorDate: Sun Dec 29 21:27:08 2024 +0800

    pooled socket data
---
 pkg/accesslog/collector/protocols/queue.go          |  2 +-
 pkg/accesslog/common/filter.go                      |  4 ++++
 pkg/accesslog/events/data.go                        |  4 ++++
 pkg/profiling/task/network/analyze/events/data.go   |  4 ++++
 pkg/profiling/task/network/analyze/layer7/events.go |  3 ++-
 pkg/tools/buffer/buffer.go                          | 20 +++++++++++++++++++-
 6 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index dfd053b..61e0c60 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -102,7 +102,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
-                       return &events.SocketDataUploadEvent{}
+                       return &events.SocketDataUploadEvent{Buffer: 
buffer.BorrowNewBuffer()}
                }, func(data interface{}) int {
                        return 
int(data.(*events.SocketDataUploadEvent).ConnectionID)
                })
diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go
index 0d60012..db23ef5 100644
--- a/pkg/accesslog/common/filter.go
+++ b/pkg/accesslog/common/filter.go
@@ -46,7 +46,11 @@ func NewStaticMonitorFilter(namespaces, clusters []string) 
*StaticMonitorFilter
 }
 
 func (s *StaticMonitorFilter) ShouldIncludeProcesses(processes 
[]api.ProcessInterface) (res []api.ProcessInterface) {
+       //var selfPid = os.Getpid()
        for _, entity := range processes {
+               //if int(entity.Pid()) == selfPid {
+               //      continue
+               //}
                if entity.DetectType() != api.Kubernetes { // for now, we only 
have the kubernetes detected processes
                        continue
                }
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 6ecfd2f..2390a2a 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -41,6 +41,10 @@ type SocketDataUploadEvent struct {
        Buffer       [2048]byte
 }
 
+func (s *SocketDataUploadEvent) ReleaseBuffer() [2048]byte {
+       return s.Buffer
+}
+
 func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
        s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8())
        s.HaveReduce = r.ReadUint8()
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index bc451fc..53f388f 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -40,6 +40,10 @@ type SocketDataUploadEvent struct {
        Buffer       [2048]byte
 }
 
+func (s *SocketDataUploadEvent) ReleaseBuffer() [2048]byte {
+       return s.Buffer
+}
+
 func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
        return s.Protocol0
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1c01084..73c3fdf 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,6 +19,7 @@ package layer7
 
 import (
        "context"
+       "github.com/apache/skywalking-rover/pkg/tools/buffer"
 
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        analyzeBase 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
@@ -37,7 +38,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize 
int, config *profili
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) 
{
        // socket buffer data
        l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
-               return &analyzeBase.SocketDataUploadEvent{}
+               return &analyzeBase.SocketDataUploadEvent{Buffer: 
buffer.BorrowNewBuffer()}
        }, func(data interface{}) int {
                return 
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
        })
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 4c18bff..86ad364 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -37,8 +37,18 @@ var (
        emptyList      = list.New()
 
        log = logger.GetLogger("tools", "buffer")
+
+       PooledBuffer = sync.Pool{
+               New: func() any {
+                       return [2048]byte{}
+               },
+       }
 )
 
+func BorrowNewBuffer() [2048]byte {
+       return PooledBuffer.Get().([2048]byte)
+}
+
 type SocketDataBuffer interface {
        // Protocol of the buffer
        Protocol() enums.ConnectionProtocol
@@ -71,6 +81,8 @@ type SocketDataBuffer interface {
        StartTime() uint64
        // EndTime the data end timestamp
        EndTime() uint64
+
+       ReleaseBuffer() [2048]byte
 }
 
 type SocketDataDetail interface {
@@ -618,7 +630,7 @@ func (r *Buffer) PrepareForReading() bool {
        if r.shouldResetPosition {
                r.ResetForLoopReading()
                r.shouldResetPosition = false
-               return false
+               return r.PrepareForReading()
        }
        if r.head == nil || r.head.element == nil {
                // read in the first element
@@ -708,6 +720,11 @@ func (r *Buffer) removeElement0(element *list.Element) 
*list.Element {
        }
        result := element.Next()
        r.dataEvents.Remove(element)
+       if element.Value != nil {
+               if b, ok := element.Value.(SocketDataBuffer); ok && b != nil {
+                       PooledBuffer.Put(b.ReleaseBuffer())
+               }
+       }
        return result
 }
 
@@ -799,6 +816,7 @@ func (r *Buffer) DeleteExpireEvents(expireDuration 
time.Duration) int {
                startTime := host.Time(buffer.StartTime())
                if expireTime.After(startTime) {
                        r.latestExpiredDataID = buffer.DataID()
+                       PooledBuffer.Put(buffer.ReleaseBuffer())
                        return true
                }
                return false

Reply via email to