This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 1641294  Downgrade the protocol of connection when protocol break 
(#181)
1641294 is described below

commit 16412947893c1026ee35a957b4c1ea8f0d32699f
Author: mrproliu <[email protected]>
AuthorDate: Tue Mar 4 12:02:01 2025 +0800

    Downgrade the protocol of connection when protocol break (#181)
---
 CHANGES.md                                 |  1 +
 bpf/include/protocol_analyzer.h            | 12 ++++-
 pkg/accesslog/collector/protocols/http1.go | 26 +++++++----
 pkg/accesslog/collector/protocols/http2.go | 74 +++++++++++++++++-------------
 pkg/accesslog/collector/protocols/queue.go |  5 +-
 pkg/accesslog/common/connection.go         | 55 ++++++++++++++++------
 6 files changed, 114 insertions(+), 59 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 16fc6fd..f35c28c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
 * Reduce unessential `conntrack` query when detect new connection.
 * Reduce CPU and memory usage in the access log module.
 * Reduce handle connection event time in the access log module.
+* Downgrade the protocol of connection when protocol break in the access log 
module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/bpf/include/protocol_analyzer.h b/bpf/include/protocol_analyzer.h
index 7efc85e..7103e4f 100644
--- a/bpf/include/protocol_analyzer.h
+++ b/bpf/include/protocol_analyzer.h
@@ -124,6 +124,10 @@ static __inline __u32 infer_http2_message(const char* buf, 
size_t count) {
         bpf_probe_read(frame, sizeof(frame), buf + frameOffset);
         frameOffset += (bpf_ntohl(*(__u32 *) frame) >> 8) + kFrameBasicSize;
 
+        // frametype only accept 0x00 - 0x09
+        if (frame[3] > 0x09) {
+            return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+        }
         // is header frame
         if (frame[3] != kFrameTypeHeader) {
             continue;
@@ -135,9 +139,15 @@ static __inline __u32 infer_http2_message(const char* buf, 
size_t count) {
             return CONNECTION_MESSAGE_TYPE_UNKNOWN;
         }
 
+        // stream ID cannot be 0
+        __u32 streamID = ((frame[5] & 0x7F) << 24) | (frame[6] << 16) | 
(frame[7] << 8) | frame[8];
+        if (streamID == 0) {
+            return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+        }
+
         // locate the header block fragment offset
         headerBlockFragmentOffset = kFrameBasicSize;
-        if (frame[4] & 0x20) {  // PADDED flag is set
+        if (frame[4] & 0x08) {  // PADDED flag is set
             headerBlockFragmentOffset += 1;
         }
         if (frame[4] & 0x20) {  // PRIORITY flag is set
diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 3039f00..695e9c4 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -36,21 +36,24 @@ import (
 var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
 var http1AnalyzeMaxRetryCount = 3
 
-type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, connection 
*PartitionConnection,
-       request *reader.Request, response *reader.Response) error
+type HTTP1ProtocolAnalyzer interface {
+       HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection,
+               request *reader.Request, response *reader.Response) error
+       OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection)
+}
 
 type HTTP1Protocol struct {
-       ctx     *common.AccessLogContext
-       analyze HTTP1ProtocolAnalyze
-       reader  *reader.Reader
+       ctx      *common.AccessLogContext
+       analyzer HTTP1ProtocolAnalyzer
+       reader   *reader.Reader
 }
 
-func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze 
HTTP1ProtocolAnalyze) *HTTP1Protocol {
+func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze 
HTTP1ProtocolAnalyzer) *HTTP1Protocol {
        protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
        if analyze == nil {
-               protocol.analyze = protocol.HandleHTTPData
+               protocol.analyzer = protocol
        } else {
-               protocol.analyze = analyze
+               protocol.analyzer = analyze
        }
        return protocol
 }
@@ -174,7 +177,7 @@ func (p *HTTP1Protocol) handleResponse(metrics 
*HTTP1Metrics, connection *Partit
        }
 
        // getting the request and response, then send to the forwarder
-       if analyzeError := p.analyze(metrics, connection, request, response); 
analyzeError != nil {
+       if analyzeError := p.analyzer.HandleHTTPData(metrics, connection, 
request, response); analyzeError != nil {
                p.appendAnalyzeUnFinished(metrics, request, response)
        }
        return enums.ParseResultSuccess, nil
@@ -191,7 +194,7 @@ func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics 
*HTTP1Metrics, request *
 func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection 
*PartitionConnection) {
        for element := m.analyzeUnFinished.Front(); element != nil; {
                unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
-               err := p.analyze(m, connection, unFinished.request, 
unFinished.response)
+               err := p.analyzer.HandleHTTPData(m, connection, 
unFinished.request, unFinished.response)
                if err != nil {
                        unFinished.retryCount++
                        if unFinished.retryCount < http1AnalyzeMaxRetryCount {
@@ -266,6 +269,9 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics 
*HTTP1Metrics, connection *Partit
        return nil
 }
 
+func (p *HTTP1Protocol) OnProtocolBreak(metrics *HTTP1Metrics, connection 
*PartitionConnection) {
+}
+
 func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
        if ioReader != nil {
                _ = ioReader.Close()
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index af81d92..74b14f5 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -43,29 +43,32 @@ var maxHTTP2StreamingTime = time.Minute * 3
 
 var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
 
-type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
+type HTTP2StreamAnalyzer interface {
+       HandleWholeStream(connection *PartitionConnection, stream 
*HTTP2Streaming) error
+       OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics)
+}
 
 type HTTP2Protocol struct {
-       ctx     *common.AccessLogContext
-       analyze HTTP2StreamAnalyze
+       ctx      *common.AccessLogContext
+       analyzer HTTP2StreamAnalyzer
 }
 
-func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyze 
HTTP2StreamAnalyze) *HTTP2Protocol {
+func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyzer 
HTTP2StreamAnalyzer) *HTTP2Protocol {
        protocol := &HTTP2Protocol{ctx: ctx}
-       if analyze == nil {
-               protocol.analyze = protocol.handleWholeStream
+       if analyzer == nil {
+               protocol.analyzer = protocol
        } else {
-               protocol.analyze = analyze
+               protocol.analyzer = analyzer
        }
        return protocol
 }
 
 type HTTP2Metrics struct {
-       connectionID uint64
-       randomID     uint64
-       hpackDecoder *hpack.Decoder
+       ConnectionID uint64
+       RandomID     uint64
+       HpackDecoder *hpack.Decoder
 
-       streams map[uint32]*HTTP2Streaming
+       Streams map[uint32]*HTTP2Streaming
 }
 
 type HTTP2Streaming struct {
@@ -82,10 +85,10 @@ type HTTP2Streaming struct {
 
 func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) 
ProtocolMetrics {
        return &HTTP2Metrics{
-               connectionID: connectionID,
-               randomID:     randomID,
-               hpackDecoder: hpack.NewDecoder(4096, nil),
-               streams:      make(map[uint32]*HTTP2Streaming),
+               ConnectionID: connectionID,
+               RandomID:     randomID,
+               HpackDecoder: hpack.NewDecoder(4096, nil),
+               Streams:      make(map[uint32]*HTTP2Streaming),
        }
 }
 
@@ -93,7 +96,7 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
        http2Metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
        buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
        http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: 
%d, random ID: %d",
-               http2Metrics.connectionID, http2Metrics.randomID)
+               http2Metrics.ConnectionID, http2Metrics.RandomID)
        buf.ResetForLoopReading()
        for {
                if !buf.PrepareForReading() {
@@ -115,9 +118,9 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                var result enums.ParseResult
                switch header.Type {
                case http2.FrameHeaders:
-                       result, protocolBreak, _ = r.handleHeader(connection, 
&header, startPosition, http2Metrics, buf)
+                       result, protocolBreak, _ = r.HandleHeader(connection, 
&header, startPosition, http2Metrics, buf)
                case http2.FrameData:
-                       result, protocolBreak, _ = r.handleData(&header, 
startPosition, http2Metrics, buf)
+                       result, protocolBreak, _ = r.HandleData(connection, 
&header, startPosition, http2Metrics, buf)
                default:
                        tmp := make([]byte, header.Length)
                        if err := buf.ReadUntilBufferFull(tmp); err != nil {
@@ -134,8 +137,9 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                // if the protocol break, then stop the loop and notify the 
caller to skip analyze all data(just sending the detail)
                if protocolBreak {
                        http2Log.Warnf("the HTTP/2 protocol break, maybe not 
tracing the connection from beginning, skip all data analyze in this 
connection, "+
-                               "connection ID: %d", http2Metrics.connectionID)
+                               "connection ID: %d", http2Metrics.ConnectionID)
                        helper.ProtocolBreak = true
+                       r.analyzer.OnProtocolBreak(connection, http2Metrics)
                        break
                }
 
@@ -159,19 +163,19 @@ func (r *HTTP2Protocol) ForProtocol() 
enums.ConnectionProtocol {
        return enums.ConnectionProtocolHTTP2
 }
 
-func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header 
*http2.FrameHeader, startPos *buffer.Position,
+func (r *HTTP2Protocol) HandleHeader(connection *PartitionConnection, header 
*http2.FrameHeader, startPos *buffer.Position,
        metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, 
error) {
        bytes := make([]byte, header.Length)
        if err := buf.ReadUntilBufferFull(bytes); err != nil {
-               return enums.ParseResultSkipPackage, false, err
+               return enums.ParseResultSkipPackage, true, err
        }
-       headerData, err := metrics.hpackDecoder.DecodeFull(bytes)
+       headerData, err := metrics.HpackDecoder.DecodeFull(bytes)
        if err != nil {
                // reading the header failure, maybe not tracing the connection 
from beginning
                return enums.ParseResultSkipPackage, true, err
        }
        // saving stream
-       streaming := metrics.streams[header.StreamID]
+       streaming := metrics.Streams[header.StreamID]
        headers := r.parseHeaders(headerData)
        if streaming == nil {
                streaming = &HTTP2Streaming{
@@ -180,7 +184,7 @@ func (r *HTTP2Protocol) handleHeader(connection 
*PartitionConnection, header *ht
                        ReqHeaderBuffer: buf.Slice(true, startPos, 
buf.Position()),
                        Connection:      connection,
                }
-               metrics.streams[header.StreamID] = streaming
+               metrics.Streams[header.StreamID] = streaming
                return enums.ParseResultSuccess, false, nil
        }
 
@@ -207,14 +211,15 @@ func (r *HTTP2Protocol) handleHeader(connection 
*PartitionConnection, header *ht
        // is end of stream and in the response
        if header.Flags.Has(http2.FlagHeadersEndStream) {
                // should be end of the stream and send to the protocol
-               _ = r.analyze(streaming)
+               _ = r.analyzer.HandleWholeStream(connection, streaming)
                // delete streaming
-               delete(metrics.streams, header.StreamID)
+               delete(metrics.Streams, header.StreamID)
        }
        return enums.ParseResultSuccess, false, nil
 }
 
-func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id 
uint32, streaming *HTTP2Streaming) {
+func (r *HTTP2Protocol) validateIsStreamOpenTooLong(connection 
*PartitionConnection,
+       metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
        // if in the response mode or the request body is not nil, then skip
        if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
                return
@@ -227,9 +232,9 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics 
*HTTP2Metrics, id ui
        }
        if time.Since(host.Time(socketBuffer.StartTime())) > 
maxHTTP2StreamingTime {
                http2Log.Infof("detect the HTTP/2 stream is too long, split the 
stream, connection ID: %d, stream ID: %d, headers: %v",
-                       metrics.connectionID, id, streaming.ReqHeader)
+                       metrics.ConnectionID, id, streaming.ReqHeader)
 
-               _ = r.analyze(streaming)
+               _ = r.analyzer.HandleWholeStream(connection, streaming)
 
                // clean sent buffers
                if streaming.ReqBodyBuffer != nil {
@@ -238,7 +243,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics 
*HTTP2Metrics, id ui
        }
 }
 
-func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
+func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream 
*HTTP2Streaming) error {
        details := make([]events.SocketDetail, 0)
        var allInclude = true
        var idRange *buffer.DataIDRange
@@ -285,6 +290,9 @@ func (r *HTTP2Protocol) handleWholeStream(stream 
*HTTP2Streaming) error {
        return nil
 }
 
+func (r *HTTP2Protocol) OnProtocolBreak(connection *PartitionConnection, 
metrics *HTTP2Metrics) {
+}
+
 func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) 
v3.AccessLogHTTPProtocolRequestMethod {
        method := streaming.ReqHeader[":method"]
        if method == "" {
@@ -318,10 +326,10 @@ func (r *HTTP2Protocol) AppendHeaders(exist, needAppends 
map[string]string) {
        }
 }
 
-func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos 
*buffer.Position,
+func (r *HTTP2Protocol) HandleData(connection *PartitionConnection, header 
*http2.FrameHeader, startPos *buffer.Position,
        metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, 
error) {
        bytes := make([]byte, header.Length)
-       streaming := metrics.streams[header.StreamID]
+       streaming := metrics.Streams[header.StreamID]
        if streaming == nil {
                // cannot found the stream, maybe not tracing the connection 
from beginning
                return enums.ParseResultSkipPackage, true, nil
@@ -335,7 +343,7 @@ func (r *HTTP2Protocol) handleData(header 
*http2.FrameHeader, startPos *buffer.P
                streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, 
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        }
 
-       r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
+       r.validateIsStreamOpenTooLong(connection, metrics, header.StreamID, 
streaming)
        return enums.ParseResultSuccess, false, nil
 }
 
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index b336f9d..0c746d3 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -345,8 +345,11 @@ func (p *PartitionContext) 
processConnectionEvents(connection *PartitionConnecti
        if helper.ProtocolBreak {
                // notify the connection manager to skip analyze all data(just 
sending the detail)
                connection.skipAllDataAnalyze = true
-               
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, 
connection.randomID)
+               
p.context.ConnectionMgr.SkipAllDataAnalyzeAndDowngradeProtocol(connection.connectionID,
 connection.randomID)
                for _, buf := range connection.dataBuffers {
+                       for e := buf.BuildDetails().Front(); e != nil; e = 
e.Next() {
+                               
forwarder.SendTransferNoProtocolEvent(p.context, e.Value.(events.SocketDetail))
+                       }
                        buf.Clean()
                }
        }
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 1c0a86e..9289b9e 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -42,6 +42,8 @@ import (
 
        cmap "github.com/orcaman/concurrent-map"
 
+       "k8s.io/apimachinery/pkg/util/cache"
+
        v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
@@ -103,6 +105,8 @@ type ConnectionManager struct {
        flushListeners []FlusherListener
 
        connectTracker *ip.ConnTrack
+
+       connectionProtocolBreakMap *cache.Expiring
 }
 
 func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -126,6 +130,7 @@ type ConnectionInfo struct {
        Socket             *ip.SocketPair
        LastCheckExistTime time.Time
        DeleteAfter        *time.Time
+       ProtocolBreak      bool
 }
 
 func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
@@ -134,16 +139,17 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
                log.Warnf("cannot create the connection tracker, %v", err)
        }
        mgr := &ConnectionManager{
-               moduleMgr:           moduleMgr,
-               processOP:           
moduleMgr.FindModule(process.ModuleName).(process.Operator),
-               connections:         cmap.New(),
-               localIPWithPid:      make(map[string]int32),
-               monitoringProcesses: make(map[int32][]api.ProcessInterface),
-               processMonitorMap:   bpfLoader.ProcessMonitorControl,
-               activeConnectionMap: bpfLoader.ActiveConnectionMap,
-               monitorFilter:       filter,
-               flushListeners:      make([]FlusherListener, 0),
-               connectTracker:      track,
+               moduleMgr:                  moduleMgr,
+               processOP:                  
moduleMgr.FindModule(process.ModuleName).(process.Operator),
+               connections:                cmap.New(),
+               localIPWithPid:             make(map[string]int32),
+               monitoringProcesses:        
make(map[int32][]api.ProcessInterface),
+               processMonitorMap:          bpfLoader.ProcessMonitorControl,
+               activeConnectionMap:        bpfLoader.ActiveConnectionMap,
+               monitorFilter:              filter,
+               flushListeners:             make([]FlusherListener, 0),
+               connectTracker:             track,
+               connectionProtocolBreakMap: cache.NewExpiring(),
        }
        return mgr
 }
@@ -234,7 +240,7 @@ func (c *ConnectionManager) Find(event events.Event) 
*ConnectionInfo {
                if localAddress == nil || remoteAddress == nil {
                        return nil
                }
-               connection := c.buildConnection(e, socket, localAddress, 
remoteAddress)
+               connection := c.buildConnection(e, socket, localAddress, 
remoteAddress, connectionKey)
                c.connections.Set(connectionKey, connection)
                if log.Enable(logrus.DebugLevel) {
                        log.Debugf("building flushing connection, connection 
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d, "+
@@ -284,7 +290,8 @@ func (c *ConnectionManager) connectionPostHandle(connection 
*ConnectionInfo, eve
                if e.GetSSL() == 1 && connection.RPCConnection.TlsMode == 
v3.AccessLogConnectionTLSMode_Plain {
                        tlsMode = v3.AccessLogConnectionTLSMode_TLS
                }
-               if e.GetProtocol() != enums.ConnectionProtocolUnknown && 
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
+               if !connection.ProtocolBreak && e.GetProtocol() != 
enums.ConnectionProtocolUnknown &&
+                       connection.RPCConnection.Protocol == 
v3.AccessLogProtocolType_TCP {
                        switch e.GetProtocol() {
                        case enums.ConnectionProtocolHTTP:
                                protocol = v3.AccessLogProtocolType_HTTP_1
@@ -292,6 +299,9 @@ func (c *ConnectionManager) connectionPostHandle(connection 
*ConnectionInfo, eve
                                protocol = v3.AccessLogProtocolType_HTTP_2
                        }
                }
+               if connection.ProtocolBreak && 
connection.RPCConnection.Protocol != v3.AccessLogProtocolType_TCP {
+                       protocol = v3.AccessLogProtocolType_TCP
+               }
                c.rebuildRPCConnectionWithTLSModeAndProtocol(connection, 
tlsMode, protocol)
        }
 
@@ -335,7 +345,7 @@ func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, 
detectType api.Process
 }
 
 func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, 
socket *ip.SocketPair,
-       local, remote *v3.ConnectionAddress) *ConnectionInfo {
+       local, remote *v3.ConnectionAddress, conKey string) *ConnectionInfo {
        var role v32.DetectPoint
        switch socket.Role {
        case enums.ConnectionRoleClient:
@@ -350,6 +360,12 @@ func (c *ConnectionManager) buildConnection(event 
*events.SocketConnectEvent, so
                TlsMode:  v3.AccessLogConnectionTLSMode_Plain,
                Protocol: v3.AccessLogProtocolType_TCP,
        }
+       val, exist := c.connectionProtocolBreakMap.Get(conKey)
+       protocolBreak := false
+       if exist {
+               protocolBreak = val.(bool)
+               c.connectionProtocolBreakMap.Delete(conKey)
+       }
        return &ConnectionInfo{
                ConnectionID:       event.ConID,
                RandomID:           event.RandomID,
@@ -357,6 +373,7 @@ func (c *ConnectionManager) buildConnection(event 
*events.SocketConnectEvent, so
                PID:                event.PID,
                Socket:             socket,
                LastCheckExistTime: time.Now(),
+               ProtocolBreak:      protocolBreak,
        }
 }
 
@@ -602,7 +619,7 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
        }
 }
 
-func (c *ConnectionManager) SkipAllDataAnalyze(conID, ranID uint64) {
+func (c *ConnectionManager) SkipAllDataAnalyzeAndDowngradeProtocol(conID, 
ranID uint64) {
        var activateConn ActiveConnection
        if err := c.activeConnectionMap.Lookup(conID, &activateConn); err != 
nil {
                if errors.Is(err, ebpf.ErrKeyNotExist) {
@@ -620,6 +637,16 @@ func (c *ConnectionManager) SkipAllDataAnalyze(conID, 
ranID uint64) {
        if err := c.activeConnectionMap.Update(conID, activateConn, 
ebpf.UpdateAny); err != nil {
                log.Warnf("failed to update the active connection: %d-%d", 
conID, ranID)
        }
+
+       connectionKey := fmt.Sprintf("%d_%d", conID, ranID)
+       data, exist := c.connections.Get(connectionKey)
+       if exist {
+               connection := data.(*ConnectionInfo)
+               connection.ProtocolBreak = true
+       } else {
+               // setting to the protocol break map for encase the runner not 
starting building logs
+               c.connectionProtocolBreakMap.Set(connectionKey, true, 
time.Minute)
+       }
 }
 
 func getSocketPairFromConnectEvent(event events.Event) 
(*events.SocketConnectEvent, *ip.SocketPair) {

Reply via email to