This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new f7d64cf  Optimization and Bug Fix for BPF and Socket Detail Data (#75)
f7d64cf is described below

commit f7d64cf6eb1aa7aa69ae092fc719774c017e39c2
Author: mrproliu <[email protected]>
AuthorDate: Thu Feb 2 23:39:05 2023 +0800

    Optimization and Bug Fix for BPF and Socket Detail Data (#75)
---
 CHANGES.md                                         |  1 +
 bpf/profiling/network/args.h                       |  4 +-
 bpf/profiling/network/sock_stats.h                 |  7 ++-
 bpf/profiling/network/socket_detail.h              |  2 +-
 .../task/network/analyze/base/connection.go        |  3 ++
 pkg/profiling/task/network/analyze/base/events.go  |  4 +-
 .../task/network/analyze/layer4/listener.go        |  2 +-
 .../task/network/analyze/layer7/events.go          | 11 ++++
 .../task/network/analyze/layer7/listener.go        |  1 +
 .../analyze/layer7/protocols/base/analyzer.go      | 59 +++++++++++++++-------
 .../analyze/layer7/protocols/base/buffer.go        | 30 +++++++++--
 .../analyze/layer7/protocols/base/events.go        |  4 +-
 .../layer7/protocols/http1/reader/reader.go        |  2 +-
 .../network/analyze/layer7/protocols/protocols.go  |  6 +++
 14 files changed, 102 insertions(+), 34 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ecdfc29..41d3be3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 * Enhance the protocol reader for support long socket data.
 * Add the syscall level event to the trace.
 * Support OpenSSL 3.0.x.
+* Optimized the data structure in BPF.
 
 #### Bug Fixes
 * Fix HTTP method name in protocol analyzer
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index 34e0bd2..9d7c90b 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -131,8 +131,8 @@ struct sock_data_args_t {
     unsigned int* msg_len;
     __u64 start_nacs;
     // rtt
-    __u64 rtt_count;
-    __u64 rtt_time;
+    __u32 rtt_count;
+    __u32 rtt_time;
     // buffer
     char* buf;
     struct iovec *iovec;
diff --git a/bpf/profiling/network/sock_stats.h 
b/bpf/profiling/network/sock_stats.h
index 604071d..54f6775 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -58,14 +58,13 @@ struct active_connection_t {
     __u64 write_rtt_count;
     __u64 write_rtt_time;
 
+    void *last_recv_sk_buff;
     // for protocol analyze
     __u8 protocol;
     // connect event already send
     __u8 connect_event_send;
     // current connection is ssl
     __u8 ssl;
-    __u8 fix;
-    void *last_recv_sk_buff;
 };
 struct {
        __uint(type, BPF_MAP_TYPE_HASH);
@@ -168,8 +167,8 @@ struct socket_close_event_t {
     __u64 read_exe_time;
 
     // RTT when write
-    __u64 write_rtt_count;
-    __u64 write_rtt_time;
+    __u32 write_rtt_count;
+    __u32 write_rtt_time;
 };
 struct {
        __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
diff --git a/bpf/profiling/network/socket_detail.h 
b/bpf/profiling/network/socket_detail.h
index 2d24e0f..b4bfa0b 100644
--- a/bpf/profiling/network/socket_detail.h
+++ b/bpf/profiling/network/socket_detail.h
@@ -29,7 +29,7 @@ struct socket_detail_t {
     __u8 func_name;
     __u8 rtt_count;
     __u8 protocol;
-    __u64 rtt_time;
+    __u32 rtt_time;
 };
 
 struct {
diff --git a/pkg/profiling/task/network/analyze/base/connection.go 
b/pkg/profiling/task/network/analyze/base/connection.go
index d637463..2099fb9 100644
--- a/pkg/profiling/task/network/analyze/base/connection.go
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -84,6 +84,9 @@ type ActiveConnectionInBPF struct {
        WriteRTTCount   uint64
        WriteRTTExeTime uint64
 
+       // sk buffer
+       _ uint64
+
        // protocol of connection
        Protocol ConnectionProtocol
        // the connect event is already sent
diff --git a/pkg/profiling/task/network/analyze/base/events.go 
b/pkg/profiling/task/network/analyze/base/events.go
index 5bdc2aa..ac6a4a1 100644
--- a/pkg/profiling/task/network/analyze/base/events.go
+++ b/pkg/profiling/task/network/analyze/base/events.go
@@ -65,6 +65,6 @@ type SocketCloseEvent struct {
        ReadCount    uint64
        ReadExeTime  uint64
 
-       WriteRTTCount   uint64
-       WriteRTTExeTime uint64
+       WriteRTTCount   uint32
+       WriteRTTExeTime uint32
 }
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go 
b/pkg/profiling/task/network/analyze/layer4/listener.go
index 162e35c..231ac0b 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -79,7 +79,7 @@ func (l *Listener) ReceiveCloseConnection(ctx 
*base.ConnectionContext, event *ba
        // data transmit counters
        layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, 
event.WriteExeTime)
        layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, 
event.ReadExeTime)
-       layer4.WriteRTTCounter.UpdateToCurrent(0, event.WriteRTTCount, 
event.WriteRTTExeTime)
+       layer4.WriteRTTCounter.UpdateToCurrent(0, uint64(event.WriteRTTCount), 
uint64(event.WriteRTTExeTime))
 
        // connection close execute time
        layer4.CloseExecuteTime = event.ExeTime
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 68b6520..a5189fb 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -21,6 +21,7 @@ import (
        "context"
 
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+       analyzeBase 
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
        "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
@@ -60,6 +61,16 @@ func (l *Listener) handleProfilingExtensionConfig(config 
*profiling.ExtensionCon
        }
 }
 
+func (l *Listener) handleConnectionClose(event *analyzeBase.SocketCloseEvent) {
+       if l.socketDataQueue == nil {
+               return
+       }
+       for _, p := range l.socketDataQueue.partitions {
+               ctx := p.ctx.(*SocketDataPartitionContext)
+               ctx.analyzer.ReceiveSocketClose(event)
+       }
+}
+
 type SocketDataPartitionContext struct {
        analyzer *protocols.Analyzer
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go 
b/pkg/profiling/task/network/analyze/layer7/listener.go
index 61f32bd..bd0d59a 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -97,6 +97,7 @@ func (l *Listener) ReceiveNewConnection(ctx 
*base.ConnectionContext, event *base
 func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event 
*base.SocketCloseEvent) {
        // cached the closed connection with TTL
        l.cachedConnections.Set(l.generateCachedConnectionKey(ctx.ConnectionID, 
ctx.RandomID), ctx, ConnectionCachedTTL)
+       l.handleConnectionClose(event)
 }
 
 func (l *Listener) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 9449116..8b4755d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -19,9 +19,12 @@ package base
 
 import (
        "context"
+       "fmt"
        "sync"
        "time"
 
+       cmap "github.com/orcaman/concurrent-map"
+
        "github.com/apache/skywalking-rover/pkg/logger"
        profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
        
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
@@ -39,7 +42,7 @@ type ProtocolAnalyzer struct {
        protocol        Protocol
        config          *profiling.TaskConfig
 
-       connections       map[connectionKey]*connectionInfo
+       connections       cmap.ConcurrentMap // connections with concurrent 
key: connection id+random id, value: *connectionInfo
        analyzeLocker     sync.Mutex
        receiveEventCount int
 }
@@ -49,7 +52,7 @@ func NewProtocolAnalyzer(protocolContext Context, p Protocol, 
config *profiling.
                protocolContext: protocolContext,
                protocol:        p,
                config:          config,
-               connections:     make(map[connectionKey]*connectionInfo),
+               connections:     cmap.New(),
        }
 }
 
@@ -116,14 +119,15 @@ func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, 
event *SocketDataUploa
 }
 
 func (a *ProtocolAnalyzer) getConnection(ctx Context, connectionID, randomID 
uint64) *connectionInfo {
-       key := connectionKey{connectionID: connectionID, randomID: randomID}
-       connection := a.connections[key]
+       conKey := a.generateConnectionInfoKey(connectionID, randomID)
+       connection, _ := a.connections.Get(conKey)
        if connection == nil {
-               connection = newConnectionInfo(a.protocol, ctx, 
key.connectionID, key.randomID)
-               a.connections[key] = connection
+               connection = newConnectionInfo(a.protocol, ctx, connectionID, 
randomID)
+               a.connections.Set(conKey, connection)
        }
-       connection.checkConnectionMetrics(ctx)
-       return connection
+       info := connection.(*connectionInfo)
+       info.checkConnectionMetrics(ctx)
+       return info
 }
 
 // processEvents means analyze the protocol in each connection
@@ -135,8 +139,19 @@ func (a *ProtocolAnalyzer) processEvents() {
        }
        defer a.analyzeLocker.Unlock()
 
-       for _, connection := range a.connections {
-               a.processConnectionEvents(connection)
+       closedConnections := make([]string, 0)
+       a.connections.IterCb(func(conKey string, con interface{}) {
+               info := con.(*connectionInfo)
+               a.processConnectionEvents(info)
+
+               // if the connection already closed and not contains any buffer 
data, then delete the connection
+               if info.closed && info.buffer.dataEvents.Len() == 0 {
+                       closedConnections = append(closedConnections, conKey)
+               }
+       })
+
+       for _, conKey := range closedConnections {
+               a.connections.Remove(conKey)
        }
 }
 
@@ -146,9 +161,9 @@ func (a *ProtocolAnalyzer) 
processExpireEvents(expireDuration time.Duration) {
        a.analyzeLocker.Lock()
        defer a.analyzeLocker.Unlock()
 
-       for _, connection := range a.connections {
-               a.processConnectionExpireEvents(connection, expireDuration)
-       }
+       a.connections.IterCb(func(_ string, con interface{}) {
+               a.processConnectionExpireEvents(con.(*connectionInfo), 
expireDuration)
+       })
 }
 
 func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) 
{
@@ -161,7 +176,7 @@ func (a *ProtocolAnalyzer) 
processConnectionEvents(connection *connectionInfo) {
        for {
                // reset the status of reading
                if !buffer.prepareForReading() {
-                       log.Debugf("prepare finsihed: event size: %d", 
buffer.dataEvents.Len())
+                       log.Debugf("prepare finsihed: reduce data event size: 
%d", buffer.dataEvents.Len())
                        return
                }
 
@@ -175,7 +190,7 @@ func (a *ProtocolAnalyzer) 
processConnectionEvents(connection *connectionInfo) {
                }
 
                if finishReading {
-                       log.Debugf("reading finsihed: event size: %d", 
buffer.dataEvents.Len())
+                       log.Debugf("reading finsihed: reduce data event size: 
%d", buffer.dataEvents.Len())
                        break
                }
        }
@@ -191,9 +206,16 @@ func (a *ProtocolAnalyzer) UpdateExtensionConfig(config 
*profiling.ExtensionConf
        a.protocol.UpdateExtensionConfig(config)
 }
 
-type connectionKey struct {
-       connectionID uint64
-       randomID     uint64
+func (a *ProtocolAnalyzer) ReceiveSocketCloseEvent(event 
*base.SocketCloseEvent) {
+       con, _ := a.connections.Get(a.generateConnectionInfoKey(event.ConID, 
event.RandomID))
+       if con == nil {
+               return
+       }
+       con.(*connectionInfo).closed = true
+}
+
+func (a *ProtocolAnalyzer) generateConnectionInfoKey(connectionID, randomID 
uint64) string {
+       return fmt.Sprintf("%d_%d", connectionID, randomID)
 }
 
 type connectionInfo struct {
@@ -202,6 +224,7 @@ type connectionInfo struct {
        buffer                 *Buffer
        metrics                Metrics
        metricsFromConnection  bool
+       closed                 bool
 }
 
 func newConnectionInfo(p Protocol, connectionContext Context, connectionID, 
randomID uint64) *connectionInfo {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
index bbcac23..ec374fc 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -41,6 +41,10 @@ type Buffer struct {
 
        head    *BufferPosition
        current *BufferPosition
+
+       // record the latest expired data id in connection for expire the older 
socket detail
+       // because the older socket detail may not be received in buffer
+       latestExpiredDataID uint64
 }
 
 type BufferPosition struct {
@@ -489,14 +493,32 @@ func (r *Buffer) deleteExpireEvents(expireDuration 
time.Duration) int {
        defer r.eventLocker.Unlock()
 
        expireTime := time.Now().Add(-expireDuration)
-       count := 0
-       for e := r.dataEvents.Front(); e != nil; {
-               startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
+       // data event queue
+       count := r.deleteEventsWithJudgement(r.dataEvents, func(element 
*list.Element) bool {
+               buffer := element.Value.(SocketDataBuffer)
+               startTime := host.Time(buffer.StartTime())
                if expireTime.After(startTime) {
+                       r.latestExpiredDataID = buffer.DataID()
+                       return true
+               }
+               return false
+       })
+
+       // detail event queue
+       count += r.deleteEventsWithJudgement(r.detailEvents, func(element 
*list.Element) bool {
+               return r.latestExpiredDataID > 0 && 
element.Value.(*SocketDetailEvent).DataID <= r.latestExpiredDataID
+       })
+       return count
+}
+
+func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element 
*list.Element) bool) int {
+       count := 0
+       for e := l.Front(); e != nil; {
+               if checker(e) {
                        count++
                        cur := e
                        e = e.Next()
-                       r.dataEvents.Remove(cur)
+                       l.Remove(cur)
                } else {
                        break
                }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index debf48c..857543c 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -24,6 +24,8 @@ import (
 )
 
 type SocketDataBuffer interface {
+       // GenerateConnectionID for identity the buffer belong which connection
+       GenerateConnectionID() string
        // BufferData of the buffer
        BufferData() []byte
        // TotalSize of socket data, the data may exceed the size of the 
BufferData()
@@ -147,7 +149,7 @@ type SocketDetailEvent struct {
        FuncName         base.SocketFunctionName
        RTTCount         uint8
        Protocol         base.ConnectionProtocol
-       RTTTime          uint64
+       RTTTime          uint32
 }
 
 func (s *SocketDetailEvent) GenerateConnectionID() string {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index ace2a13..4197e72 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -99,7 +99,7 @@ func (m *MessageOpt) StartTime() uint64 {
 }
 
 func (m *MessageOpt) EndTime() uint64 {
-       return m.HeaderBuffer().LastSocketBuffer().EndTime()
+       return m.BodyBuffer().LastSocketBuffer().EndTime()
 }
 
 func (m *MessageOpt) Direction() base.SocketDataDirection {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index ad1859c..480a3d8 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -94,6 +94,12 @@ func (a *Analyzer) UpdateExtensionConfig(config 
*profiling.ExtensionConfig) {
        }
 }
 
+func (a *Analyzer) ReceiveSocketClose(event *base.SocketCloseEvent) {
+       for _, p := range a.protocols {
+               p.ReceiveSocketCloseEvent(event)
+       }
+}
+
 type ProtocolMetrics struct {
        data map[base.ConnectionProtocol]protocol.Metrics
 }

Reply via email to