This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new d833c09 pooled socket data
d833c09 is described below
commit d833c09d12abe925353caa375080b8a5f7960e97
Author: mrproliu <[email protected]>
AuthorDate: Sun Dec 29 21:27:08 2024 +0800
pooled socket data
---
pkg/accesslog/collector/protocols/queue.go | 2 +-
pkg/accesslog/common/filter.go | 4 ++++
pkg/accesslog/events/data.go | 4 ++++
pkg/profiling/task/network/analyze/events/data.go | 4 ++++
pkg/profiling/task/network/analyze/layer7/events.go | 3 ++-
pkg/tools/buffer/buffer.go | 20 +++++++++++++++++++-
6 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index dfd053b..61e0c60 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -102,7 +102,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
- return &events.SocketDataUploadEvent{}
+ return &events.SocketDataUploadEvent{Buffer:
buffer.BorrowNewBuffer()}
}, func(data interface{}) int {
return
int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go
index 0d60012..db23ef5 100644
--- a/pkg/accesslog/common/filter.go
+++ b/pkg/accesslog/common/filter.go
@@ -46,7 +46,11 @@ func NewStaticMonitorFilter(namespaces, clusters []string)
*StaticMonitorFilter
}
func (s *StaticMonitorFilter) ShouldIncludeProcesses(processes
[]api.ProcessInterface) (res []api.ProcessInterface) {
+ //var selfPid = os.Getpid()
for _, entity := range processes {
+ //if int(entity.Pid()) == selfPid {
+ // continue
+ //}
if entity.DetectType() != api.Kubernetes { // for now, we only
have the kubernetes detected processes
continue
}
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 6ecfd2f..2390a2a 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -41,6 +41,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}
+func (s *SocketDataUploadEvent) ReleaseBuffer() [2048]byte {
+ return s.Buffer
+}
+
func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8())
s.HaveReduce = r.ReadUint8()
diff --git a/pkg/profiling/task/network/analyze/events/data.go
b/pkg/profiling/task/network/analyze/events/data.go
index bc451fc..53f388f 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -40,6 +40,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}
+func (s *SocketDataUploadEvent) ReleaseBuffer() [2048]byte {
+ return s.Buffer
+}
+
func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
return s.Protocol0
}
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1c01084..73c3fdf 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,6 +19,7 @@ package layer7
import (
"context"
+ "github.com/apache/skywalking-rover/pkg/tools/buffer"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
analyzeBase
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
@@ -37,7 +38,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
// socket buffer data
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
- return &analyzeBase.SocketDataUploadEvent{}
+ return &analyzeBase.SocketDataUploadEvent{Buffer:
buffer.BorrowNewBuffer()}
}, func(data interface{}) int {
return
int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
})
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 4c18bff..86ad364 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -37,8 +37,18 @@ var (
emptyList = list.New()
log = logger.GetLogger("tools", "buffer")
+
+ PooledBuffer = sync.Pool{
+ New: func() any {
+ return [2048]byte{}
+ },
+ }
)
+func BorrowNewBuffer() [2048]byte {
+ return PooledBuffer.Get().([2048]byte)
+}
+
type SocketDataBuffer interface {
// Protocol of the buffer
Protocol() enums.ConnectionProtocol
@@ -71,6 +81,8 @@ type SocketDataBuffer interface {
StartTime() uint64
// EndTime the data end timestamp
EndTime() uint64
+
+ ReleaseBuffer() [2048]byte
}
type SocketDataDetail interface {
@@ -618,7 +630,7 @@ func (r *Buffer) PrepareForReading() bool {
if r.shouldResetPosition {
r.ResetForLoopReading()
r.shouldResetPosition = false
- return false
+ return r.PrepareForReading()
}
if r.head == nil || r.head.element == nil {
// read in the first element
@@ -708,6 +720,11 @@ func (r *Buffer) removeElement0(element *list.Element)
*list.Element {
}
result := element.Next()
r.dataEvents.Remove(element)
+ if element.Value != nil {
+ if b, ok := element.Value.(SocketDataBuffer); ok && b != nil {
+ PooledBuffer.Put(b.ReleaseBuffer())
+ }
+ }
return result
}
@@ -799,6 +816,7 @@ func (r *Buffer) DeleteExpireEvents(expireDuration
time.Duration) int {
startTime := host.Time(buffer.StartTime())
if expireTime.After(startTime) {
r.latestExpiredDataID = buffer.DataID()
+ PooledBuffer.Put(buffer.ReleaseBuffer())
return true
}
return false