This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 045ace9  Refector HTTP protocol analyzer in access log module (#178)
045ace9 is described below

commit 045ace936bd854de9e97c4c763dc166670e39789
Author: mrproliu <[email protected]>
AuthorDate: Fri Feb 14 15:34:51 2025 +0800

    Refector HTTP protocol analyzer in access log module (#178)
---
 pkg/accesslog/collector/protocols/http1.go | 19 +++++++++++--------
 pkg/accesslog/collector/protocols/http2.go |  6 ++++--
 2 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 227422f..9ad0486 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -36,7 +36,8 @@ import (
 var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
 var http1AnalyzeMaxRetryCount = 3
 
-type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, 
response *reader.Response) error
+type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, connection 
*PartitionConnection,
+       request *reader.Request, response *reader.Response) error
 
 type HTTP1Protocol struct {
        ctx     *common.AccessLogContext
@@ -82,7 +83,7 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
        buf := connection.Buffer(enums.ConnectionProtocolHTTP)
        http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: 
%d, random ID: %d, data len: %d",
                metrics.ConnectionID, metrics.RandomID, buf.DataLength())
-       p.handleUnFinishedEvents(metrics)
+       p.handleUnFinishedEvents(metrics, connection)
        buf.ResetForLoopReading()
        for {
                if !buf.PrepareForReading() {
@@ -103,7 +104,7 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                case reader.MessageTypeRequest:
                        result, err = p.handleRequest(metrics, buf)
                case reader.MessageTypeResponse:
-                       result, err = p.handleResponse(metrics, buf)
+                       result, err = p.handleResponse(metrics, connection, buf)
                case reader.MessageTypeUnknown:
                        result = enums.ParseResultSkipPackage
                }
@@ -148,7 +149,8 @@ func (p *HTTP1Protocol) handleRequest(metrics 
*HTTP1Metrics, buf *buffer.Buffer)
        return result, nil
 }
 
-func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b 
*buffer.Buffer) (enums.ParseResult, error) {
+func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, connection 
*PartitionConnection,
+       b *buffer.Buffer) (enums.ParseResult, error) {
        request := metrics.findMatchesRequest(b.Position().DataID(), 
b.Position().PrevDataID())
        if request == nil {
                log.Debugf("cannot found request for response, skip response, 
connection ID: %d, random ID: %d, "+
@@ -172,7 +174,7 @@ func (p *HTTP1Protocol) handleResponse(metrics 
*HTTP1Metrics, b *buffer.Buffer)
        }
 
        // getting the request and response, then send to the forwarder
-       if analyzeError := p.analyze(metrics, request, response); analyzeError 
!= nil {
+       if analyzeError := p.analyze(metrics, connection, request, response); 
analyzeError != nil {
                p.appendAnalyzeUnFinished(metrics, request, response)
        }
        return enums.ParseResultSuccess, nil
@@ -186,10 +188,10 @@ func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics 
*HTTP1Metrics, request *
        })
 }
 
-func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
+func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection 
*PartitionConnection) {
        for element := m.analyzeUnFinished.Front(); element != nil; {
                unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
-               err := p.analyze(m, unFinished.request, unFinished.response)
+               err := p.analyze(m, connection, unFinished.request, 
unFinished.response)
                if err != nil {
                        unFinished.retryCount++
                        if unFinished.retryCount < http1AnalyzeMaxRetryCount {
@@ -205,7 +207,8 @@ func (p *HTTP1Protocol) handleUnFinishedEvents(m 
*HTTP1Metrics) {
        }
 }
 
-func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) error {
+func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection 
*PartitionConnection,
+       request *reader.Request, response *reader.Response) error {
        details := make([]events.SocketDetail, 0)
        var allInclude = true
        var idRange *buffer.DataIDRange
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index f048c14..673088e 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -77,6 +77,7 @@ type HTTP2Streaming struct {
        Status           int
        RespHeaderBuffer *buffer.Buffer
        RespBodyBuffer   *buffer.Buffer
+       connection       *PartitionConnection
 }
 
 func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) 
ProtocolMetrics {
@@ -114,7 +115,7 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                var result enums.ParseResult
                switch header.Type {
                case http2.FrameHeaders:
-                       result, protocolBreak, _ = r.handleHeader(&header, 
startPosition, http2Metrics, buf)
+                       result, protocolBreak, _ = r.handleHeader(connection, 
&header, startPosition, http2Metrics, buf)
                case http2.FrameData:
                        result, protocolBreak, _ = r.handleData(&header, 
startPosition, http2Metrics, buf)
                default:
@@ -158,7 +159,7 @@ func (r *HTTP2Protocol) ForProtocol() 
enums.ConnectionProtocol {
        return enums.ConnectionProtocolHTTP2
 }
 
-func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos 
*buffer.Position,
+func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header 
*http2.FrameHeader, startPos *buffer.Position,
        metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, 
error) {
        bytes := make([]byte, header.Length)
        if err := buf.ReadUntilBufferFull(bytes); err != nil {
@@ -177,6 +178,7 @@ func (r *HTTP2Protocol) handleHeader(header 
*http2.FrameHeader, startPos *buffer
                        ReqHeader:       headers,
                        RespHeader:      make(map[string]string),
                        ReqHeaderBuffer: buf.Slice(true, startPos, 
buf.Position()),
+                       connection:      connection,
                }
                metrics.streams[header.StreamID] = streaming
                return enums.ParseResultSuccess, false, nil

Reply via email to