This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 16b0438 Fix wrong protocol break handle logic (#182)
16b0438 is described below
commit 16b04387fc50d93b2e81901f2eefae80cdf20863
Author: mrproliu <[email protected]>
AuthorDate: Tue Mar 4 19:53:53 2025 +0800
Fix wrong protocol break handle logic (#182)
---
pkg/accesslog/collector/protocols/http2.go | 4 ++--
pkg/accesslog/common/connection.go | 26 ++++++++++++++------------
2 files changed, 16 insertions(+), 14 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index 74b14f5..0ccdf00 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -136,7 +136,7 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
// if the protocol break, then stop the loop and notify the
caller to skip analyze all data(just sending the detail)
if protocolBreak {
- http2Log.Warnf("the HTTP/2 protocol break, maybe not
tracing the connection from beginning, skip all data analyze in this
connection, "+
+ http2Log.Debugf("the HTTP/2 protocol break, maybe not
tracing the connection from beginning, skip all data analyze in this
connection, "+
"connection ID: %d", http2Metrics.ConnectionID)
helper.ProtocolBreak = true
r.analyzer.OnProtocolBreak(connection, http2Metrics)
@@ -231,7 +231,7 @@ func (r *HTTP2Protocol)
validateIsStreamOpenTooLong(connection *PartitionConnect
return
}
if time.Since(host.Time(socketBuffer.StartTime())) >
maxHTTP2StreamingTime {
- http2Log.Infof("detect the HTTP/2 stream is too long, split the
stream, connection ID: %d, stream ID: %d, headers: %v",
+ http2Log.Debugf("detect the HTTP/2 stream is too long, split
the stream, connection ID: %d, stream ID: %d, headers: %v",
metrics.ConnectionID, id, streaming.ReqHeader)
_ = r.analyzer.HandleWholeStream(connection, streaming)
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 9289b9e..fddc2f3 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -244,9 +244,9 @@ func (c *ConnectionManager) Find(event events.Event)
*ConnectionInfo {
c.connections.Set(connectionKey, connection)
if log.Enable(logrus.DebugLevel) {
log.Debugf("building flushing connection, connection
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d, "+
- "local address: %s, remote address: %s",
+ "local address: %s, remote address: %s,
protocol: %s",
e.GetConnectionID(), e.GetRandomID(),
socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort,
- localAddress.String(), remoteAddress.String())
+ localAddress.String(), remoteAddress.String(),
connection.RPCConnection.Protocol.String())
}
c.connectionPostHandle(connection, event)
return connection
@@ -620,6 +620,18 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished()
{
}
func (c *ConnectionManager) SkipAllDataAnalyzeAndDowngradeProtocol(conID,
ranID uint64) {
+ // setting connection protocol is break
+ connectionKey := fmt.Sprintf("%d_%d", conID, ranID)
+ data, exist := c.connections.Get(connectionKey)
+ if exist {
+ connection := data.(*ConnectionInfo)
+ connection.ProtocolBreak = true
+ } else {
+ // setting to the protocol break map for encase the runner not
starting building logs
+ c.connectionProtocolBreakMap.Set(connectionKey, true,
time.Minute)
+ }
+
+ // setting the connection skip data upload
var activateConn ActiveConnection
if err := c.activeConnectionMap.Lookup(conID, &activateConn); err !=
nil {
if errors.Is(err, ebpf.ErrKeyNotExist) {
@@ -637,16 +649,6 @@ func (c *ConnectionManager)
SkipAllDataAnalyzeAndDowngradeProtocol(conID, ranID
if err := c.activeConnectionMap.Update(conID, activateConn,
ebpf.UpdateAny); err != nil {
log.Warnf("failed to update the active connection: %d-%d",
conID, ranID)
}
-
- connectionKey := fmt.Sprintf("%d_%d", conID, ranID)
- data, exist := c.connections.Get(connectionKey)
- if exist {
- connection := data.(*ConnectionInfo)
- connection.ProtocolBreak = true
- } else {
- // setting to the protocol break map for encase the runner not
starting building logs
- c.connectionProtocolBreakMap.Set(connectionKey, true,
time.Minute)
- }
}
func getSocketPairFromConnectEvent(event events.Event)
(*events.SocketConnectEvent, *ip.SocketPair) {