This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new e73c782 Support analyze multiple protocol in the same connection
(#160)
e73c782 is described below
commit e73c7829172bb7c9eb96f1ca3ba1f5fd304c2553
Author: mrproliu <[email protected]>
AuthorDate: Thu Nov 28 19:31:24 2024 +0900
Support analyze multiple protocol in the same connection (#160)
---
pkg/accesslog/collector/protocols/connection.go | 12 ++---
pkg/accesslog/collector/protocols/http1.go | 19 +++----
pkg/accesslog/collector/protocols/http2.go | 21 ++++----
pkg/accesslog/collector/protocols/queue.go | 63 +++++++++++++++-------
pkg/accesslog/events/data.go | 6 ++-
pkg/profiling/task/network/analyze/events/data.go | 6 ++-
.../network/analyze/layer7/protocols/protocols.go | 4 +-
pkg/tools/buffer/buffer.go | 6 +++
pkg/tools/ssl/gotls.go | 2 +-
9 files changed, 89 insertions(+), 50 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/connection.go
b/pkg/accesslog/collector/protocols/connection.go
index 7ae9325..b6a9e24 100644
--- a/pkg/accesslog/collector/protocols/connection.go
+++ b/pkg/accesslog/collector/protocols/connection.go
@@ -29,8 +29,8 @@ import (
type PartitionConnection struct {
connectionID, randomID uint64
- dataBuffer *buffer.Buffer
- protocol map[enums.ConnectionProtocol]bool
+ dataBuffers map[enums.ConnectionProtocol]*buffer.Buffer
+ protocol map[enums.ConnectionProtocol]uint64 // protocol
with minimal data id
protocolAnalyzer map[enums.ConnectionProtocol]Protocol
protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
closed bool
@@ -48,8 +48,8 @@ func (p *PartitionConnection) IsExistProtocol(protocol
enums.ConnectionProtocol)
return exist
}
-func (p *PartitionConnection) Buffer() *buffer.Buffer {
- return p.dataBuffer
+func (p *PartitionConnection) Buffer(protocol enums.ConnectionProtocol)
*buffer.Buffer {
+ return p.dataBuffers[protocol]
}
func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext,
detail events.SocketDetail) {
@@ -58,12 +58,12 @@ func (p *PartitionConnection) AppendDetail(ctx
*common.AccessLogContext, detail
forwarder.SendTransferNoProtocolEvent(ctx, detail)
return
}
- p.dataBuffer.AppendDetailEvent(detail)
+ p.dataBuffers[detail.GetProtocol()].AppendDetailEvent(detail)
}
func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
if p.skipAllDataAnalyze {
return
}
- p.dataBuffer.AppendDataEvent(data)
+ p.dataBuffers[data.Protocol()].AppendDataEvent(data)
}
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 06b1b49..92a8173 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -68,18 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID,
randomID uint64) Protoc
func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _
*AnalyzeHelper) error {
metrics :=
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
+ buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID:
%d, random ID: %d, data len: %d",
- metrics.ConnectionID, metrics.RandomID,
connection.Buffer().DataLength())
- connection.Buffer().ResetForLoopReading()
+ metrics.ConnectionID, metrics.RandomID, buf.DataLength())
+ buf.ResetForLoopReading()
for {
- if !connection.Buffer().PrepareForReading() {
+ if !buf.PrepareForReading() {
return nil
}
- messageType, err :=
reader.IdentityMessageType(connection.Buffer())
+ messageType, err := reader.IdentityMessageType(buf)
if err != nil {
http1Log.Debugf("failed to identity message type, %v",
err)
- if connection.Buffer().SkipCurrentElement() {
+ if buf.SkipCurrentElement() {
break
}
continue
@@ -88,9 +89,9 @@ func (p *HTTP1Protocol) Analyze(connection
*PartitionConnection, _ *AnalyzeHelpe
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
- result, _ = p.handleRequest(metrics,
connection.Buffer())
+ result, _ = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
- result, _ = p.handleResponse(metrics,
connection.Buffer())
+ result, _ = p.handleResponse(metrics, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}
@@ -98,9 +99,9 @@ func (p *HTTP1Protocol) Analyze(connection
*PartitionConnection, _ *AnalyzeHelpe
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = connection.Buffer().RemoveReadElements()
+ finishReading = buf.RemoveReadElements()
case enums.ParseResultSkipPackage:
- finishReading = connection.Buffer().SkipCurrentElement()
+ finishReading = buf.SkipCurrentElement()
}
if finishReading {
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index 486533c..b18226a 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -89,19 +89,20 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID,
randomID uint64) Protoc
func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper
*AnalyzeHelper) error {
http2Metrics :=
connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
+ buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID:
%d, random ID: %d",
http2Metrics.connectionID, http2Metrics.randomID)
- connection.Buffer().ResetForLoopReading()
+ buf.ResetForLoopReading()
for {
- if !connection.Buffer().PrepareForReading() {
+ if !buf.PrepareForReading() {
return nil
}
- startPosition := connection.Buffer().Position()
- header, err := http2.ReadFrameHeader(connection.Buffer())
+ startPosition := buf.Position()
+ header, err := http2.ReadFrameHeader(buf)
if err != nil {
http2Log.Debugf("failed to read frame header, %v", err)
- if connection.Buffer().SkipCurrentElement() {
+ if buf.SkipCurrentElement() {
break
}
continue
@@ -112,12 +113,12 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
- result, protocolBreak, _ = r.handleHeader(&header,
startPosition, http2Metrics, connection.Buffer())
+ result, protocolBreak, _ = r.handleHeader(&header,
startPosition, http2Metrics, buf)
case http2.FrameData:
- result, protocolBreak, _ = r.handleData(&header,
startPosition, http2Metrics, connection.Buffer())
+ result, protocolBreak, _ = r.handleData(&header,
startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
- if err := connection.Buffer().ReadUntilBufferFull(tmp);
err != nil {
+ if err := buf.ReadUntilBufferFull(tmp); err != nil {
if errors.Is(err, buffer.ErrNotComplete) {
result = enums.ParseResultSkipPackage
} else {
@@ -139,9 +140,9 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = connection.Buffer().RemoveReadElements()
+ finishReading = buf.RemoveReadElements()
case enums.ParseResultSkipPackage:
- finishReading = connection.Buffer().SkipCurrentElement()
+ finishReading = buf.SkipCurrentElement()
}
if finishReading {
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index e505d3e..ad66584 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"os"
+ "sort"
"sync"
"time"
@@ -119,25 +120,30 @@ type PartitionContext struct {
analyzeLocker sync.Mutex
}
-func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID
uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
+func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID
uint64,
+ protocol enums.ConnectionProtocol, currentDataID uint64)
*PartitionConnection {
connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
- dataBuffer: buffer.NewBuffer(),
- protocol: make(map[enums.ConnectionProtocol]bool),
+ dataBuffers:
make(map[enums.ConnectionProtocol]*buffer.Buffer),
+ protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics:
make(map[enums.ConnectionProtocol]ProtocolMetrics),
}
- connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol)
+ connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol,
currentDataID)
return connection
}
-func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr
*ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) {
- if _, exist := p.protocol[protocol]; !exist {
+func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr
*ProtocolManager, conID, randomID uint64,
+ protocol enums.ConnectionProtocol, currentDataID uint64) {
+ if minDataID, exist := p.protocol[protocol]; !exist {
analyzer := protocolMgr.GetProtocol(protocol)
- p.protocol[protocol] = true
+ p.protocol[protocol] = currentDataID
+ p.dataBuffers[protocol] = buffer.NewBuffer()
p.protocolAnalyzer[protocol] = analyzer
p.protocolMetrics[protocol] =
analyzer.GenerateConnection(conID, randomID)
+ } else if currentDataID < minDataID {
+ p.protocol[protocol] = currentDataID
}
}
@@ -212,26 +218,27 @@ func (p *PartitionContext) Consume(data interface{}) {
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
- connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol())
+ connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol(), event.DataID())
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
- event.ConnectionID, event.RandomID, pid, event.DataID0,
event.Sequence0, event.Protocol)
- connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol)
+ event.ConnectionID, event.RandomID, pid, event.DataID0,
event.Sequence0, event.Protocol0)
+ connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
}
-func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol) *PartitionConnection {
+func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
+ protocol enums.ConnectionProtocol, currentDataID uint64)
*PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
if exist {
connection := conn.(*PartitionConnection)
- connection.appendProtocolIfNeed(p.protocolMgr, connectionID,
randomID, protocol)
+ connection.appendProtocolIfNeed(p.protocolMgr, connectionID,
randomID, protocol, currentDataID)
return connection
}
- result := newPartitionConnection(p.protocolMgr, connectionID, randomID,
protocol)
+ result := newPartitionConnection(p.protocolMgr, connectionID, randomID,
protocol, currentDataID)
p.connections.Set(conKey, result)
return result
}
@@ -254,7 +261,10 @@ func (p *PartitionContext) processEvents() {
p.processConnectionEvents(info)
// if the connection already closed and not contains any buffer
data, then delete the connection
- bufLen := info.dataBuffer.DataLength()
+ var bufLen = 0
+ for _, buf := range info.dataBuffers {
+ bufLen += buf.DataLength()
+ }
if bufLen > 0 {
return
}
@@ -309,9 +319,11 @@ func (p *PartitionContext) processExpireEvents() {
}
func (p *PartitionContext) processConnectionExpireEvents(connection
*PartitionConnection) {
- if c :=
connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
- log.Debugf("total removed %d expired socket data events from
connection ID: %d, random ID: %d", c,
- connection.connectionID, connection.randomID)
+ for _, buf := range connection.dataBuffers {
+ if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
+ log.Debugf("total removed %d expired socket data events
from connection ID: %d, random ID: %d", c,
+ connection.connectionID, connection.randomID)
+ }
}
}
@@ -320,8 +332,17 @@ func (p *PartitionContext)
processConnectionEvents(connection *PartitionConnecti
return
}
helper := &AnalyzeHelper{}
- for protocol, analyzer := range connection.protocolAnalyzer {
- if err := analyzer.Analyze(connection, helper); err != nil {
+
+ // since the socket data/detail are getting unsorted, so rover need to
using the minimal data id to analyze to ensure the order
+ sortedProtocols := make([]enums.ConnectionProtocol, 0,
len(connection.protocol))
+ for protocol := range connection.protocol {
+ sortedProtocols = append(sortedProtocols, protocol)
+ }
+ sort.Slice(sortedProtocols, func(i, j int) bool {
+ return connection.protocol[sortedProtocols[i]] <
connection.protocol[sortedProtocols[j]]
+ })
+ for _, protocol := range sortedProtocols {
+ if err :=
connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil {
log.Warnf("failed to analyze the %s protocol data: %v",
enums.ConnectionProtocolString(protocol), err)
}
}
@@ -330,6 +351,8 @@ func (p *PartitionContext)
processConnectionEvents(connection *PartitionConnecti
// notify the connection manager to skip analyze all data(just
sending the detail)
connection.skipAllDataAnalyze = true
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID,
connection.randomID)
- connection.dataBuffer.Clean()
+ for _, buf := range connection.dataBuffers {
+ buf.Clean()
+ }
}
}
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index 747ff9f..ed48059 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -24,7 +24,7 @@ import (
)
type SocketDataUploadEvent struct {
- Protocol enums.ConnectionProtocol
+ Protocol0 enums.ConnectionProtocol
HaveReduce uint8
Direction0 enums.SocketDataDirection
Finished uint8
@@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}
+func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
+ return s.Protocol0
+}
+
func (s *SocketDataUploadEvent) GenerateConnectionID() string {
return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
}
diff --git a/pkg/profiling/task/network/analyze/events/data.go
b/pkg/profiling/task/network/analyze/events/data.go
index 9ca242d..804e96b 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -24,7 +24,7 @@ import (
)
type SocketDataUploadEvent struct {
- Protocol enums.ConnectionProtocol
+ Protocol0 enums.ConnectionProtocol
HaveReduce uint8
Direction0 enums.SocketDataDirection
Finished uint8
@@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}
+func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
+ return s.Protocol0
+}
+
func (s *SocketDataUploadEvent) GenerateConnectionID() string {
return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 60bcd71..0b8c887 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -71,10 +71,10 @@ func (a *Analyzer) Start(ctx context.Context) {
}
func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent)
{
- analyzer := a.protocols[event.Protocol]
+ analyzer := a.protocols[event.Protocol()]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket data,
connection id: %s, protocol: %s(%d)",
- event.GenerateConnectionID(),
enums.ConnectionProtocolString(event.Protocol), event.Protocol)
+ event.GenerateConnectionID(),
enums.ConnectionProtocolString(event.Protocol()), event.Protocol())
return
}
analyzer.ReceiveSocketData(a.ctx, event)
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 1c71f7f..0bfc5d9 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -33,6 +33,8 @@ var (
)
type SocketDataBuffer interface {
+ // Protocol of the buffer
+ Protocol() enums.ConnectionProtocol
// GenerateConnectionID for identity the buffer belong which connection
GenerateConnectionID() string
// BufferData of the buffer
@@ -88,6 +90,10 @@ type SocketDataEventLimited struct {
Size int
}
+func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol {
+ return s.SocketDataBuffer.Protocol()
+}
+
func (s *SocketDataEventLimited) BufferData() []byte {
return s.SocketDataBuffer.BufferData()[s.From:s.Size]
}
diff --git a/pkg/tools/ssl/gotls.go b/pkg/tools/ssl/gotls.go
index 2b2b3d7..573854e 100644
--- a/pkg/tools/ssl/gotls.go
+++ b/pkg/tools/ssl/gotls.go
@@ -206,7 +206,7 @@ func (r *Register) generateGOTLSSymbolOffsets(register
*Register, elfFile *elf.F
sym := register.SearchSymbol(func(a, b string) bool {
return a == b
- }, "go.itab.*net.TCPConn,net.Conn")
+ }, "go.itab.*net.TCPConn,net.Conn", "go:itab.*net.TCPConn,net.Conn")
if sym == nil {
log.Warnf("could not found the tcp connection symbol:
go.itab.*net.TCPConn,net.Conn")
return nil, nil