This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 1641294 Downgrade the protocol of connection when protocol break
(#181)
1641294 is described below
commit 16412947893c1026ee35a957b4c1ea8f0d32699f
Author: mrproliu <[email protected]>
AuthorDate: Tue Mar 4 12:02:01 2025 +0800
Downgrade the protocol of connection when protocol break (#181)
---
CHANGES.md | 1 +
bpf/include/protocol_analyzer.h | 12 ++++-
pkg/accesslog/collector/protocols/http1.go | 26 +++++++----
pkg/accesslog/collector/protocols/http2.go | 74 +++++++++++++++++-------------
pkg/accesslog/collector/protocols/queue.go | 5 +-
pkg/accesslog/common/connection.go | 55 ++++++++++++++++------
6 files changed, 114 insertions(+), 59 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 16fc6fd..f35c28c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
* Reduce unessential `conntrack` query when detect new connection.
* Reduce CPU and memory usage in the access log module.
* Reduce handle connection event time in the access log module.
+* Downgrade the protocol of connection when protocol break in the access log
module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/bpf/include/protocol_analyzer.h b/bpf/include/protocol_analyzer.h
index 7efc85e..7103e4f 100644
--- a/bpf/include/protocol_analyzer.h
+++ b/bpf/include/protocol_analyzer.h
@@ -124,6 +124,10 @@ static __inline __u32 infer_http2_message(const char* buf,
size_t count) {
bpf_probe_read(frame, sizeof(frame), buf + frameOffset);
frameOffset += (bpf_ntohl(*(__u32 *) frame) >> 8) + kFrameBasicSize;
+ // frametype only accept 0x00 - 0x09
+ if (frame[3] > 0x09) {
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+ }
// is header frame
if (frame[3] != kFrameTypeHeader) {
continue;
@@ -135,9 +139,15 @@ static __inline __u32 infer_http2_message(const char* buf,
size_t count) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}
+ // stream ID cannot be 0
+ __u32 streamID = ((frame[5] & 0x7F) << 24) | (frame[6] << 16) |
(frame[7] << 8) | frame[8];
+ if (streamID == 0) {
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+ }
+
// locate the header block fragment offset
headerBlockFragmentOffset = kFrameBasicSize;
- if (frame[4] & 0x20) { // PADDED flag is set
+ if (frame[4] & 0x08) { // PADDED flag is set
headerBlockFragmentOffset += 1;
}
if (frame[4] & 0x20) { // PRIORITY flag is set
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 3039f00..695e9c4 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -36,21 +36,24 @@ import (
var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
var http1AnalyzeMaxRetryCount = 3
-type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, connection
*PartitionConnection,
- request *reader.Request, response *reader.Response) error
+type HTTP1ProtocolAnalyzer interface {
+ HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection,
+ request *reader.Request, response *reader.Response) error
+ OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection)
+}
type HTTP1Protocol struct {
- ctx *common.AccessLogContext
- analyze HTTP1ProtocolAnalyze
- reader *reader.Reader
+ ctx *common.AccessLogContext
+ analyzer HTTP1ProtocolAnalyzer
+ reader *reader.Reader
}
-func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze
HTTP1ProtocolAnalyze) *HTTP1Protocol {
+func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze
HTTP1ProtocolAnalyzer) *HTTP1Protocol {
protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
if analyze == nil {
- protocol.analyze = protocol.HandleHTTPData
+ protocol.analyzer = protocol
} else {
- protocol.analyze = analyze
+ protocol.analyzer = analyze
}
return protocol
}
@@ -174,7 +177,7 @@ func (p *HTTP1Protocol) handleResponse(metrics
*HTTP1Metrics, connection *Partit
}
// getting the request and response, then send to the forwarder
- if analyzeError := p.analyze(metrics, connection, request, response);
analyzeError != nil {
+ if analyzeError := p.analyzer.HandleHTTPData(metrics, connection,
request, response); analyzeError != nil {
p.appendAnalyzeUnFinished(metrics, request, response)
}
return enums.ParseResultSuccess, nil
@@ -191,7 +194,7 @@ func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics
*HTTP1Metrics, request *
func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection
*PartitionConnection) {
for element := m.analyzeUnFinished.Front(); element != nil; {
unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
- err := p.analyze(m, connection, unFinished.request,
unFinished.response)
+ err := p.analyzer.HandleHTTPData(m, connection,
unFinished.request, unFinished.response)
if err != nil {
unFinished.retryCount++
if unFinished.retryCount < http1AnalyzeMaxRetryCount {
@@ -266,6 +269,9 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics
*HTTP1Metrics, connection *Partit
return nil
}
+func (p *HTTP1Protocol) OnProtocolBreak(metrics *HTTP1Metrics, connection
*PartitionConnection) {
+}
+
func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
if ioReader != nil {
_ = ioReader.Close()
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index af81d92..74b14f5 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -43,29 +43,32 @@ var maxHTTP2StreamingTime = time.Minute * 3
var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
-type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
+type HTTP2StreamAnalyzer interface {
+ HandleWholeStream(connection *PartitionConnection, stream
*HTTP2Streaming) error
+ OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics)
+}
type HTTP2Protocol struct {
- ctx *common.AccessLogContext
- analyze HTTP2StreamAnalyze
+ ctx *common.AccessLogContext
+ analyzer HTTP2StreamAnalyzer
}
-func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyze
HTTP2StreamAnalyze) *HTTP2Protocol {
+func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyzer
HTTP2StreamAnalyzer) *HTTP2Protocol {
protocol := &HTTP2Protocol{ctx: ctx}
- if analyze == nil {
- protocol.analyze = protocol.handleWholeStream
+ if analyzer == nil {
+ protocol.analyzer = protocol
} else {
- protocol.analyze = analyze
+ protocol.analyzer = analyzer
}
return protocol
}
type HTTP2Metrics struct {
- connectionID uint64
- randomID uint64
- hpackDecoder *hpack.Decoder
+ ConnectionID uint64
+ RandomID uint64
+ HpackDecoder *hpack.Decoder
- streams map[uint32]*HTTP2Streaming
+ Streams map[uint32]*HTTP2Streaming
}
type HTTP2Streaming struct {
@@ -82,10 +85,10 @@ type HTTP2Streaming struct {
func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64)
ProtocolMetrics {
return &HTTP2Metrics{
- connectionID: connectionID,
- randomID: randomID,
- hpackDecoder: hpack.NewDecoder(4096, nil),
- streams: make(map[uint32]*HTTP2Streaming),
+ ConnectionID: connectionID,
+ RandomID: randomID,
+ HpackDecoder: hpack.NewDecoder(4096, nil),
+ Streams: make(map[uint32]*HTTP2Streaming),
}
}
@@ -93,7 +96,7 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
http2Metrics :=
connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID:
%d, random ID: %d",
- http2Metrics.connectionID, http2Metrics.randomID)
+ http2Metrics.ConnectionID, http2Metrics.RandomID)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
@@ -115,9 +118,9 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
- result, protocolBreak, _ = r.handleHeader(connection,
&header, startPosition, http2Metrics, buf)
+ result, protocolBreak, _ = r.HandleHeader(connection,
&header, startPosition, http2Metrics, buf)
case http2.FrameData:
- result, protocolBreak, _ = r.handleData(&header,
startPosition, http2Metrics, buf)
+ result, protocolBreak, _ = r.HandleData(connection,
&header, startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(tmp); err != nil {
@@ -134,8 +137,9 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
// if the protocol break, then stop the loop and notify the
caller to skip analyze all data(just sending the detail)
if protocolBreak {
http2Log.Warnf("the HTTP/2 protocol break, maybe not
tracing the connection from beginning, skip all data analyze in this
connection, "+
- "connection ID: %d", http2Metrics.connectionID)
+ "connection ID: %d", http2Metrics.ConnectionID)
helper.ProtocolBreak = true
+ r.analyzer.OnProtocolBreak(connection, http2Metrics)
break
}
@@ -159,19 +163,19 @@ func (r *HTTP2Protocol) ForProtocol()
enums.ConnectionProtocol {
return enums.ConnectionProtocolHTTP2
}
-func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header
*http2.FrameHeader, startPos *buffer.Position,
+func (r *HTTP2Protocol) HandleHeader(connection *PartitionConnection, header
*http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool,
error) {
bytes := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(bytes); err != nil {
- return enums.ParseResultSkipPackage, false, err
+ return enums.ParseResultSkipPackage, true, err
}
- headerData, err := metrics.hpackDecoder.DecodeFull(bytes)
+ headerData, err := metrics.HpackDecoder.DecodeFull(bytes)
if err != nil {
// reading the header failure, maybe not tracing the connection
from beginning
return enums.ParseResultSkipPackage, true, err
}
// saving stream
- streaming := metrics.streams[header.StreamID]
+ streaming := metrics.Streams[header.StreamID]
headers := r.parseHeaders(headerData)
if streaming == nil {
streaming = &HTTP2Streaming{
@@ -180,7 +184,7 @@ func (r *HTTP2Protocol) handleHeader(connection
*PartitionConnection, header *ht
ReqHeaderBuffer: buf.Slice(true, startPos,
buf.Position()),
Connection: connection,
}
- metrics.streams[header.StreamID] = streaming
+ metrics.Streams[header.StreamID] = streaming
return enums.ParseResultSuccess, false, nil
}
@@ -207,14 +211,15 @@ func (r *HTTP2Protocol) handleHeader(connection
*PartitionConnection, header *ht
// is end of stream and in the response
if header.Flags.Has(http2.FlagHeadersEndStream) {
// should be end of the stream and send to the protocol
- _ = r.analyze(streaming)
+ _ = r.analyzer.HandleWholeStream(connection, streaming)
// delete streaming
- delete(metrics.streams, header.StreamID)
+ delete(metrics.Streams, header.StreamID)
}
return enums.ParseResultSuccess, false, nil
}
-func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id
uint32, streaming *HTTP2Streaming) {
+func (r *HTTP2Protocol) validateIsStreamOpenTooLong(connection
*PartitionConnection,
+ metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
// if in the response mode or the request body is not nil, then skip
if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
return
@@ -227,9 +232,9 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics
*HTTP2Metrics, id ui
}
if time.Since(host.Time(socketBuffer.StartTime())) >
maxHTTP2StreamingTime {
http2Log.Infof("detect the HTTP/2 stream is too long, split the
stream, connection ID: %d, stream ID: %d, headers: %v",
- metrics.connectionID, id, streaming.ReqHeader)
+ metrics.ConnectionID, id, streaming.ReqHeader)
- _ = r.analyze(streaming)
+ _ = r.analyzer.HandleWholeStream(connection, streaming)
// clean sent buffers
if streaming.ReqBodyBuffer != nil {
@@ -238,7 +243,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics
*HTTP2Metrics, id ui
}
}
-func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
+func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream
*HTTP2Streaming) error {
details := make([]events.SocketDetail, 0)
var allInclude = true
var idRange *buffer.DataIDRange
@@ -285,6 +290,9 @@ func (r *HTTP2Protocol) handleWholeStream(stream
*HTTP2Streaming) error {
return nil
}
+func (r *HTTP2Protocol) OnProtocolBreak(connection *PartitionConnection,
metrics *HTTP2Metrics) {
+}
+
func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming)
v3.AccessLogHTTPProtocolRequestMethod {
method := streaming.ReqHeader[":method"]
if method == "" {
@@ -318,10 +326,10 @@ func (r *HTTP2Protocol) AppendHeaders(exist, needAppends
map[string]string) {
}
}
-func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos
*buffer.Position,
+func (r *HTTP2Protocol) HandleData(connection *PartitionConnection, header
*http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool,
error) {
bytes := make([]byte, header.Length)
- streaming := metrics.streams[header.StreamID]
+ streaming := metrics.Streams[header.StreamID]
if streaming == nil {
// cannot found the stream, maybe not tracing the connection
from beginning
return enums.ParseResultSkipPackage, true, nil
@@ -335,7 +343,7 @@ func (r *HTTP2Protocol) handleData(header
*http2.FrameHeader, startPos *buffer.P
streaming.RespBodyBuffer = buffer.CombineSlices(true, buf,
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
}
- r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
+ r.validateIsStreamOpenTooLong(connection, metrics, header.StreamID,
streaming)
return enums.ParseResultSuccess, false, nil
}
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index b336f9d..0c746d3 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -345,8 +345,11 @@ func (p *PartitionContext)
processConnectionEvents(connection *PartitionConnecti
if helper.ProtocolBreak {
// notify the connection manager to skip analyze all data(just
sending the detail)
connection.skipAllDataAnalyze = true
-
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID,
connection.randomID)
+
p.context.ConnectionMgr.SkipAllDataAnalyzeAndDowngradeProtocol(connection.connectionID,
connection.randomID)
for _, buf := range connection.dataBuffers {
+ for e := buf.BuildDetails().Front(); e != nil; e =
e.Next() {
+
forwarder.SendTransferNoProtocolEvent(p.context, e.Value.(events.SocketDetail))
+ }
buf.Clean()
}
}
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 1c0a86e..9289b9e 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -42,6 +42,8 @@ import (
cmap "github.com/orcaman/concurrent-map"
+ "k8s.io/apimachinery/pkg/util/cache"
+
v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
@@ -103,6 +105,8 @@ type ConnectionManager struct {
flushListeners []FlusherListener
connectTracker *ip.ConnTrack
+
+ connectionProtocolBreakMap *cache.Expiring
}
func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -126,6 +130,7 @@ type ConnectionInfo struct {
Socket *ip.SocketPair
LastCheckExistTime time.Time
DeleteAfter *time.Time
+ ProtocolBreak bool
}
func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
@@ -134,16 +139,17 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
log.Warnf("cannot create the connection tracker, %v", err)
}
mgr := &ConnectionManager{
- moduleMgr: moduleMgr,
- processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
- connections: cmap.New(),
- localIPWithPid: make(map[string]int32),
- monitoringProcesses: make(map[int32][]api.ProcessInterface),
- processMonitorMap: bpfLoader.ProcessMonitorControl,
- activeConnectionMap: bpfLoader.ActiveConnectionMap,
- monitorFilter: filter,
- flushListeners: make([]FlusherListener, 0),
- connectTracker: track,
+ moduleMgr: moduleMgr,
+ processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
+ connections: cmap.New(),
+ localIPWithPid: make(map[string]int32),
+ monitoringProcesses:
make(map[int32][]api.ProcessInterface),
+ processMonitorMap: bpfLoader.ProcessMonitorControl,
+ activeConnectionMap: bpfLoader.ActiveConnectionMap,
+ monitorFilter: filter,
+ flushListeners: make([]FlusherListener, 0),
+ connectTracker: track,
+ connectionProtocolBreakMap: cache.NewExpiring(),
}
return mgr
}
@@ -234,7 +240,7 @@ func (c *ConnectionManager) Find(event events.Event)
*ConnectionInfo {
if localAddress == nil || remoteAddress == nil {
return nil
}
- connection := c.buildConnection(e, socket, localAddress,
remoteAddress)
+ connection := c.buildConnection(e, socket, localAddress,
remoteAddress, connectionKey)
c.connections.Set(connectionKey, connection)
if log.Enable(logrus.DebugLevel) {
log.Debugf("building flushing connection, connection
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d, "+
@@ -284,7 +290,8 @@ func (c *ConnectionManager) connectionPostHandle(connection
*ConnectionInfo, eve
if e.GetSSL() == 1 && connection.RPCConnection.TlsMode ==
v3.AccessLogConnectionTLSMode_Plain {
tlsMode = v3.AccessLogConnectionTLSMode_TLS
}
- if e.GetProtocol() != enums.ConnectionProtocolUnknown &&
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
+ if !connection.ProtocolBreak && e.GetProtocol() !=
enums.ConnectionProtocolUnknown &&
+ connection.RPCConnection.Protocol ==
v3.AccessLogProtocolType_TCP {
switch e.GetProtocol() {
case enums.ConnectionProtocolHTTP:
protocol = v3.AccessLogProtocolType_HTTP_1
@@ -292,6 +299,9 @@ func (c *ConnectionManager) connectionPostHandle(connection
*ConnectionInfo, eve
protocol = v3.AccessLogProtocolType_HTTP_2
}
}
+ if connection.ProtocolBreak &&
connection.RPCConnection.Protocol != v3.AccessLogProtocolType_TCP {
+ protocol = v3.AccessLogProtocolType_TCP
+ }
c.rebuildRPCConnectionWithTLSModeAndProtocol(connection,
tlsMode, protocol)
}
@@ -335,7 +345,7 @@ func (c *ConnectionManager) ProcessIsDetectBy(pid uint32,
detectType api.Process
}
func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent,
socket *ip.SocketPair,
- local, remote *v3.ConnectionAddress) *ConnectionInfo {
+ local, remote *v3.ConnectionAddress, conKey string) *ConnectionInfo {
var role v32.DetectPoint
switch socket.Role {
case enums.ConnectionRoleClient:
@@ -350,6 +360,12 @@ func (c *ConnectionManager) buildConnection(event
*events.SocketConnectEvent, so
TlsMode: v3.AccessLogConnectionTLSMode_Plain,
Protocol: v3.AccessLogProtocolType_TCP,
}
+ val, exist := c.connectionProtocolBreakMap.Get(conKey)
+ protocolBreak := false
+ if exist {
+ protocolBreak = val.(bool)
+ c.connectionProtocolBreakMap.Delete(conKey)
+ }
return &ConnectionInfo{
ConnectionID: event.ConID,
RandomID: event.RandomID,
@@ -357,6 +373,7 @@ func (c *ConnectionManager) buildConnection(event
*events.SocketConnectEvent, so
PID: event.PID,
Socket: socket,
LastCheckExistTime: time.Now(),
+ ProtocolBreak: protocolBreak,
}
}
@@ -602,7 +619,7 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
}
}
-func (c *ConnectionManager) SkipAllDataAnalyze(conID, ranID uint64) {
+func (c *ConnectionManager) SkipAllDataAnalyzeAndDowngradeProtocol(conID,
ranID uint64) {
var activateConn ActiveConnection
if err := c.activeConnectionMap.Lookup(conID, &activateConn); err !=
nil {
if errors.Is(err, ebpf.ErrKeyNotExist) {
@@ -620,6 +637,16 @@ func (c *ConnectionManager) SkipAllDataAnalyze(conID,
ranID uint64) {
if err := c.activeConnectionMap.Update(conID, activateConn,
ebpf.UpdateAny); err != nil {
log.Warnf("failed to update the active connection: %d-%d",
conID, ranID)
}
+
+ connectionKey := fmt.Sprintf("%d_%d", conID, ranID)
+ data, exist := c.connections.Get(connectionKey)
+ if exist {
+ connection := data.(*ConnectionInfo)
+ connection.ProtocolBreak = true
+ } else {
+ // setting to the protocol break map for encase the runner not
starting building logs
+ c.connectionProtocolBreakMap.Set(connectionKey, true,
time.Minute)
+ }
}
func getSocketPairFromConnectEvent(event events.Event)
(*events.SocketConnectEvent, *ip.SocketPair) {