This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new e73c782  Support analyze multiple protocol in the same connection 
(#160)
e73c782 is described below

commit e73c7829172bb7c9eb96f1ca3ba1f5fd304c2553
Author: mrproliu <[email protected]>
AuthorDate: Thu Nov 28 19:31:24 2024 +0900

    Support analyze multiple protocol in the same connection (#160)
---
 pkg/accesslog/collector/protocols/connection.go    | 12 ++---
 pkg/accesslog/collector/protocols/http1.go         | 19 +++----
 pkg/accesslog/collector/protocols/http2.go         | 21 ++++----
 pkg/accesslog/collector/protocols/queue.go         | 63 +++++++++++++++-------
 pkg/accesslog/events/data.go                       |  6 ++-
 pkg/profiling/task/network/analyze/events/data.go  |  6 ++-
 .../network/analyze/layer7/protocols/protocols.go  |  4 +-
 pkg/tools/buffer/buffer.go                         |  6 +++
 pkg/tools/ssl/gotls.go                             |  2 +-
 9 files changed, 89 insertions(+), 50 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/connection.go 
b/pkg/accesslog/collector/protocols/connection.go
index 7ae9325..b6a9e24 100644
--- a/pkg/accesslog/collector/protocols/connection.go
+++ b/pkg/accesslog/collector/protocols/connection.go
@@ -29,8 +29,8 @@ import (
 
 type PartitionConnection struct {
        connectionID, randomID uint64
-       dataBuffer             *buffer.Buffer
-       protocol               map[enums.ConnectionProtocol]bool
+       dataBuffers            map[enums.ConnectionProtocol]*buffer.Buffer
+       protocol               map[enums.ConnectionProtocol]uint64 // protocol 
with minimal data id
        protocolAnalyzer       map[enums.ConnectionProtocol]Protocol
        protocolMetrics        map[enums.ConnectionProtocol]ProtocolMetrics
        closed                 bool
@@ -48,8 +48,8 @@ func (p *PartitionConnection) IsExistProtocol(protocol 
enums.ConnectionProtocol)
        return exist
 }
 
-func (p *PartitionConnection) Buffer() *buffer.Buffer {
-       return p.dataBuffer
+func (p *PartitionConnection) Buffer(protocol enums.ConnectionProtocol) 
*buffer.Buffer {
+       return p.dataBuffers[protocol]
 }
 
 func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, 
detail events.SocketDetail) {
@@ -58,12 +58,12 @@ func (p *PartitionConnection) AppendDetail(ctx 
*common.AccessLogContext, detail
                forwarder.SendTransferNoProtocolEvent(ctx, detail)
                return
        }
-       p.dataBuffer.AppendDetailEvent(detail)
+       p.dataBuffers[detail.GetProtocol()].AppendDetailEvent(detail)
 }
 
 func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
        if p.skipAllDataAnalyze {
                return
        }
-       p.dataBuffer.AppendDataEvent(data)
+       p.dataBuffers[data.Protocol()].AppendDataEvent(data)
 }
diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 06b1b49..92a8173 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -68,18 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, 
randomID uint64) Protoc
 
 func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ 
*AnalyzeHelper) error {
        metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
+       buf := connection.Buffer(enums.ConnectionProtocolHTTP)
        http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: 
%d, random ID: %d, data len: %d",
-               metrics.ConnectionID, metrics.RandomID, 
connection.Buffer().DataLength())
-       connection.Buffer().ResetForLoopReading()
+               metrics.ConnectionID, metrics.RandomID, buf.DataLength())
+       buf.ResetForLoopReading()
        for {
-               if !connection.Buffer().PrepareForReading() {
+               if !buf.PrepareForReading() {
                        return nil
                }
 
-               messageType, err := 
reader.IdentityMessageType(connection.Buffer())
+               messageType, err := reader.IdentityMessageType(buf)
                if err != nil {
                        http1Log.Debugf("failed to identity message type, %v", 
err)
-                       if connection.Buffer().SkipCurrentElement() {
+                       if buf.SkipCurrentElement() {
                                break
                        }
                        continue
@@ -88,9 +89,9 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                var result enums.ParseResult
                switch messageType {
                case reader.MessageTypeRequest:
-                       result, _ = p.handleRequest(metrics, 
connection.Buffer())
+                       result, _ = p.handleRequest(metrics, buf)
                case reader.MessageTypeResponse:
-                       result, _ = p.handleResponse(metrics, 
connection.Buffer())
+                       result, _ = p.handleResponse(metrics, buf)
                case reader.MessageTypeUnknown:
                        result = enums.ParseResultSkipPackage
                }
@@ -98,9 +99,9 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = connection.Buffer().RemoveReadElements()
+                       finishReading = buf.RemoveReadElements()
                case enums.ParseResultSkipPackage:
-                       finishReading = connection.Buffer().SkipCurrentElement()
+                       finishReading = buf.SkipCurrentElement()
                }
 
                if finishReading {
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index 486533c..b18226a 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -89,19 +89,20 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID, 
randomID uint64) Protoc
 
 func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper 
*AnalyzeHelper) error {
        http2Metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
+       buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
        http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: 
%d, random ID: %d",
                http2Metrics.connectionID, http2Metrics.randomID)
-       connection.Buffer().ResetForLoopReading()
+       buf.ResetForLoopReading()
        for {
-               if !connection.Buffer().PrepareForReading() {
+               if !buf.PrepareForReading() {
                        return nil
                }
 
-               startPosition := connection.Buffer().Position()
-               header, err := http2.ReadFrameHeader(connection.Buffer())
+               startPosition := buf.Position()
+               header, err := http2.ReadFrameHeader(buf)
                if err != nil {
                        http2Log.Debugf("failed to read frame header, %v", err)
-                       if connection.Buffer().SkipCurrentElement() {
+                       if buf.SkipCurrentElement() {
                                break
                        }
                        continue
@@ -112,12 +113,12 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                var result enums.ParseResult
                switch header.Type {
                case http2.FrameHeaders:
-                       result, protocolBreak, _ = r.handleHeader(&header, 
startPosition, http2Metrics, connection.Buffer())
+                       result, protocolBreak, _ = r.handleHeader(&header, 
startPosition, http2Metrics, buf)
                case http2.FrameData:
-                       result, protocolBreak, _ = r.handleData(&header, 
startPosition, http2Metrics, connection.Buffer())
+                       result, protocolBreak, _ = r.handleData(&header, 
startPosition, http2Metrics, buf)
                default:
                        tmp := make([]byte, header.Length)
-                       if err := connection.Buffer().ReadUntilBufferFull(tmp); 
err != nil {
+                       if err := buf.ReadUntilBufferFull(tmp); err != nil {
                                if errors.Is(err, buffer.ErrNotComplete) {
                                        result = enums.ParseResultSkipPackage
                                } else {
@@ -139,9 +140,9 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = connection.Buffer().RemoveReadElements()
+                       finishReading = buf.RemoveReadElements()
                case enums.ParseResultSkipPackage:
-                       finishReading = connection.Buffer().SkipCurrentElement()
+                       finishReading = buf.SkipCurrentElement()
                }
 
                if finishReading {
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index e505d3e..ad66584 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "os"
+       "sort"
        "sync"
        "time"
 
@@ -119,25 +120,30 @@ type PartitionContext struct {
        analyzeLocker sync.Mutex
 }
 
-func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID 
uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
+func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID 
uint64,
+       protocol enums.ConnectionProtocol, currentDataID uint64) 
*PartitionConnection {
        connection := &PartitionConnection{
                connectionID:     conID,
                randomID:         randomID,
-               dataBuffer:       buffer.NewBuffer(),
-               protocol:         make(map[enums.ConnectionProtocol]bool),
+               dataBuffers:      
make(map[enums.ConnectionProtocol]*buffer.Buffer),
+               protocol:         make(map[enums.ConnectionProtocol]uint64),
                protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
                protocolMetrics:  
make(map[enums.ConnectionProtocol]ProtocolMetrics),
        }
-       connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol)
+       connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, 
currentDataID)
        return connection
 }
 
-func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr 
*ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) {
-       if _, exist := p.protocol[protocol]; !exist {
+func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr 
*ProtocolManager, conID, randomID uint64,
+       protocol enums.ConnectionProtocol, currentDataID uint64) {
+       if minDataID, exist := p.protocol[protocol]; !exist {
                analyzer := protocolMgr.GetProtocol(protocol)
-               p.protocol[protocol] = true
+               p.protocol[protocol] = currentDataID
+               p.dataBuffers[protocol] = buffer.NewBuffer()
                p.protocolAnalyzer[protocol] = analyzer
                p.protocolMetrics[protocol] = 
analyzer.GenerateConnection(conID, randomID)
+       } else if currentDataID < minDataID {
+               p.protocol[protocol] = currentDataID
        }
 }
 
@@ -212,26 +218,27 @@ func (p *PartitionContext) Consume(data interface{}) {
                        forwarder.SendTransferNoProtocolEvent(p.context, event)
                        return
                }
-               connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol())
+               connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol(), event.DataID())
                connection.AppendDetail(p.context, event)
        case *events.SocketDataUploadEvent:
                pid, _ := events.ParseConnectionID(event.ConnectionID)
                log.Debugf("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
-                       event.ConnectionID, event.RandomID, pid, event.DataID0, 
event.Sequence0, event.Protocol)
-               connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol)
+                       event.ConnectionID, event.RandomID, pid, event.DataID0, 
event.Sequence0, event.Protocol0)
+               connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol0, event.DataID0)
                connection.AppendData(event)
        }
 }
 
-func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, 
protocol enums.ConnectionProtocol) *PartitionConnection {
+func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
+       protocol enums.ConnectionProtocol, currentDataID uint64) 
*PartitionConnection {
        conKey := p.buildConnectionKey(connectionID, randomID)
        conn, exist := p.connections.Get(conKey)
        if exist {
                connection := conn.(*PartitionConnection)
-               connection.appendProtocolIfNeed(p.protocolMgr, connectionID, 
randomID, protocol)
+               connection.appendProtocolIfNeed(p.protocolMgr, connectionID, 
randomID, protocol, currentDataID)
                return connection
        }
-       result := newPartitionConnection(p.protocolMgr, connectionID, randomID, 
protocol)
+       result := newPartitionConnection(p.protocolMgr, connectionID, randomID, 
protocol, currentDataID)
        p.connections.Set(conKey, result)
        return result
 }
@@ -254,7 +261,10 @@ func (p *PartitionContext) processEvents() {
                p.processConnectionEvents(info)
 
                // if the connection already closed and not contains any buffer 
data, then delete the connection
-               bufLen := info.dataBuffer.DataLength()
+               var bufLen = 0
+               for _, buf := range info.dataBuffers {
+                       bufLen += buf.DataLength()
+               }
                if bufLen > 0 {
                        return
                }
@@ -309,9 +319,11 @@ func (p *PartitionContext) processExpireEvents() {
 }
 
 func (p *PartitionContext) processConnectionExpireEvents(connection 
*PartitionConnection) {
-       if c := 
connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
-               log.Debugf("total removed %d expired socket data events from 
connection ID: %d, random ID: %d", c,
-                       connection.connectionID, connection.randomID)
+       for _, buf := range connection.dataBuffers {
+               if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
+                       log.Debugf("total removed %d expired socket data events 
from connection ID: %d, random ID: %d", c,
+                               connection.connectionID, connection.randomID)
+               }
        }
 }
 
@@ -320,8 +332,17 @@ func (p *PartitionContext) 
processConnectionEvents(connection *PartitionConnecti
                return
        }
        helper := &AnalyzeHelper{}
-       for protocol, analyzer := range connection.protocolAnalyzer {
-               if err := analyzer.Analyze(connection, helper); err != nil {
+
+       // since the socket data/detail are getting unsorted, so rover need to 
using the minimal data id to analyze to ensure the order
+       sortedProtocols := make([]enums.ConnectionProtocol, 0, 
len(connection.protocol))
+       for protocol := range connection.protocol {
+               sortedProtocols = append(sortedProtocols, protocol)
+       }
+       sort.Slice(sortedProtocols, func(i, j int) bool {
+               return connection.protocol[sortedProtocols[i]] < 
connection.protocol[sortedProtocols[j]]
+       })
+       for _, protocol := range sortedProtocols {
+               if err := 
connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil {
                        log.Warnf("failed to analyze the %s protocol data: %v", 
enums.ConnectionProtocolString(protocol), err)
                }
        }
@@ -330,6 +351,8 @@ func (p *PartitionContext) 
processConnectionEvents(connection *PartitionConnecti
                // notify the connection manager to skip analyze all data(just 
sending the detail)
                connection.skipAllDataAnalyze = true
                
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, 
connection.randomID)
-               connection.dataBuffer.Clean()
+               for _, buf := range connection.dataBuffers {
+                       buf.Clean()
+               }
        }
 }
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 747ff9f..ed48059 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -24,7 +24,7 @@ import (
 )
 
 type SocketDataUploadEvent struct {
-       Protocol     enums.ConnectionProtocol
+       Protocol0    enums.ConnectionProtocol
        HaveReduce   uint8
        Direction0   enums.SocketDataDirection
        Finished     uint8
@@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
        Buffer       [2048]byte
 }
 
+func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
+       return s.Protocol0
+}
+
 func (s *SocketDataUploadEvent) GenerateConnectionID() string {
        return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
 }
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index 9ca242d..804e96b 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -24,7 +24,7 @@ import (
 )
 
 type SocketDataUploadEvent struct {
-       Protocol     enums.ConnectionProtocol
+       Protocol0    enums.ConnectionProtocol
        HaveReduce   uint8
        Direction0   enums.SocketDataDirection
        Finished     uint8
@@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
        Buffer       [2048]byte
 }
 
+func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
+       return s.Protocol0
+}
+
 func (s *SocketDataUploadEvent) GenerateConnectionID() string {
        return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 60bcd71..0b8c887 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -71,10 +71,10 @@ func (a *Analyzer) Start(ctx context.Context) {
 }
 
 func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) 
{
-       analyzer := a.protocols[event.Protocol]
+       analyzer := a.protocols[event.Protocol()]
        if analyzer == nil {
                log.Warnf("could not found any protocol to handle socket data, 
connection id: %s, protocol: %s(%d)",
-                       event.GenerateConnectionID(), 
enums.ConnectionProtocolString(event.Protocol), event.Protocol)
+                       event.GenerateConnectionID(), 
enums.ConnectionProtocolString(event.Protocol()), event.Protocol())
                return
        }
        analyzer.ReceiveSocketData(a.ctx, event)
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 1c71f7f..0bfc5d9 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -33,6 +33,8 @@ var (
 )
 
 type SocketDataBuffer interface {
+       // Protocol of the buffer
+       Protocol() enums.ConnectionProtocol
        // GenerateConnectionID for identity the buffer belong which connection
        GenerateConnectionID() string
        // BufferData of the buffer
@@ -88,6 +90,10 @@ type SocketDataEventLimited struct {
        Size int
 }
 
+func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol {
+       return s.SocketDataBuffer.Protocol()
+}
+
 func (s *SocketDataEventLimited) BufferData() []byte {
        return s.SocketDataBuffer.BufferData()[s.From:s.Size]
 }
diff --git a/pkg/tools/ssl/gotls.go b/pkg/tools/ssl/gotls.go
index 2b2b3d7..573854e 100644
--- a/pkg/tools/ssl/gotls.go
+++ b/pkg/tools/ssl/gotls.go
@@ -206,7 +206,7 @@ func (r *Register) generateGOTLSSymbolOffsets(register 
*Register, elfFile *elf.F
 
        sym := register.SearchSymbol(func(a, b string) bool {
                return a == b
-       }, "go.itab.*net.TCPConn,net.Conn")
+       }, "go.itab.*net.TCPConn,net.Conn", "go:itab.*net.TCPConn,net.Conn")
        if sym == nil {
                log.Warnf("could not found the tcp connection symbol: 
go.itab.*net.TCPConn,net.Conn")
                return nil, nil

Reply via email to