This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new e522b40  Refactor the protocol analyze logical (#156)
e522b40 is described below

commit e522b40baa8ec63f3154fb8b763bc57377a6bf74
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 19 12:47:22 2024 +0900

    Refactor the protocol analyze logical (#156)
---
 pkg/accesslog/collector/protocols/connection.go    |  69 ++++++++++
 pkg/accesslog/collector/protocols/http.go          |  46 +++++++
 pkg/accesslog/collector/protocols/http1.go         | 118 ++++++++---------
 pkg/accesslog/collector/protocols/http2.go         | 145 +++++++++++----------
 pkg/accesslog/collector/protocols/protocol.go      |  20 ++-
 pkg/accesslog/collector/protocols/queue.go         |  86 ++++++------
 pkg/profiling/task/network/analyze/base/metrics.go |   2 +-
 .../task/network/analyze/layer4/listener.go        |   5 +-
 .../analyze/layer7/protocols/base/analyzer.go      |   2 +-
 .../analyze/layer7/protocols/http1/metrics.go      |   2 +-
 .../network/analyze/layer7/protocols/protocols.go  |   4 +-
 pkg/tools/enums/protocol.go                        |  26 ++++
 pkg/tools/enums/socket.go                          |  21 ---
 13 files changed, 329 insertions(+), 217 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/connection.go 
b/pkg/accesslog/collector/protocols/connection.go
new file mode 100644
index 0000000..7ae9325
--- /dev/null
+++ b/pkg/accesslog/collector/protocols/connection.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package protocols
+
+import (
+       "time"
+
+       "github.com/apache/skywalking-rover/pkg/accesslog/common"
+       "github.com/apache/skywalking-rover/pkg/accesslog/events"
+       "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
+       "github.com/apache/skywalking-rover/pkg/tools/buffer"
+       "github.com/apache/skywalking-rover/pkg/tools/enums"
+)
+
+type PartitionConnection struct {
+       connectionID, randomID uint64
+       dataBuffer             *buffer.Buffer
+       protocol               map[enums.ConnectionProtocol]bool
+       protocolAnalyzer       map[enums.ConnectionProtocol]Protocol
+       protocolMetrics        map[enums.ConnectionProtocol]ProtocolMetrics
+       closed                 bool
+       closeCallback          common.ConnectionProcessFinishCallback
+       skipAllDataAnalyze     bool
+       lastCheckCloseTime     time.Time
+}
+
+func (p *PartitionConnection) Metrics(protocol enums.ConnectionProtocol) 
ProtocolMetrics {
+       return p.protocolMetrics[protocol]
+}
+
+func (p *PartitionConnection) IsExistProtocol(protocol 
enums.ConnectionProtocol) bool {
+       _, exist := p.protocol[protocol]
+       return exist
+}
+
+func (p *PartitionConnection) Buffer() *buffer.Buffer {
+       return p.dataBuffer
+}
+
+func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, 
detail events.SocketDetail) {
+       if p.skipAllDataAnalyze {
+               // if the connection is already skip all data analyze, then 
just send the detail event
+               forwarder.SendTransferNoProtocolEvent(ctx, detail)
+               return
+       }
+       p.dataBuffer.AppendDetailEvent(detail)
+}
+
+func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
+       if p.skipAllDataAnalyze {
+               return
+       }
+       p.dataBuffer.AppendDataEvent(data)
+}
diff --git a/pkg/accesslog/collector/protocols/http.go 
b/pkg/accesslog/collector/protocols/http.go
new file mode 100644
index 0000000..3b7d076
--- /dev/null
+++ b/pkg/accesslog/collector/protocols/http.go
@@ -0,0 +1,46 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package protocols
+
+import v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+
+// TransformHTTPMethod transforms the http method to the 
v3.AccessLogHTTPProtocolRequestMethod
+func TransformHTTPMethod(method string) v3.AccessLogHTTPProtocolRequestMethod {
+       switch method {
+       case "GET":
+               return v3.AccessLogHTTPProtocolRequestMethod_Get
+       case "POST":
+               return v3.AccessLogHTTPProtocolRequestMethod_Post
+       case "PUT":
+               return v3.AccessLogHTTPProtocolRequestMethod_Put
+       case "DELETE":
+               return v3.AccessLogHTTPProtocolRequestMethod_Delete
+       case "HEAD":
+               return v3.AccessLogHTTPProtocolRequestMethod_Head
+       case "OPTIONS":
+               return v3.AccessLogHTTPProtocolRequestMethod_Options
+       case "TRACE":
+               return v3.AccessLogHTTPProtocolRequestMethod_Trace
+       case "CONNECT":
+               return v3.AccessLogHTTPProtocolRequestMethod_Connect
+       case "PATCH":
+               return v3.AccessLogHTTPProtocolRequestMethod_Patch
+       }
+       http1Log.Warnf("unknown http method: %s", method)
+       return v3.AccessLogHTTPProtocolRequestMethod_Get
+}
diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index a33e170..06b1b49 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -34,45 +34,52 @@ import (
 
 var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
 
-func init() {
-       registeredProtocols[enums.ConnectionProtocolHTTP] = func(ctx 
*common.AccessLogContext) Protocol {
-               return &HTTP1Protocol{ctx: ctx}
-       }
-}
+type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, 
response *reader.Response)
 
 type HTTP1Protocol struct {
-       ctx *common.AccessLogContext
+       ctx     *common.AccessLogContext
+       analyze HTTP1ProtocolAnalyze
+}
+
+func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze 
HTTP1ProtocolAnalyze) *HTTP1Protocol {
+       protocol := &HTTP1Protocol{ctx: ctx}
+       if analyze == nil {
+               protocol.analyze = protocol.HandleHTTPData
+       } else {
+               protocol.analyze = analyze
+       }
+       return protocol
 }
 
 type HTTP1Metrics struct {
-       connectionID uint64
-       randomID     uint64
+       ConnectionID uint64
+       RandomID     uint64
 
        halfRequests *list.List
 }
 
 func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) 
ProtocolMetrics {
        return &HTTP1Metrics{
-               connectionID: connectionID,
-               randomID:     randomID,
+               ConnectionID: connectionID,
+               RandomID:     randomID,
                halfRequests: list.New(),
        }
 }
 
-func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, _ 
*AnalyzeHelper) error {
-       http1Metrics := metrics.(*HTTP1Metrics)
+func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ 
*AnalyzeHelper) error {
+       metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
        http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: 
%d, random ID: %d, data len: %d",
-               http1Metrics.connectionID, http1Metrics.randomID, 
buf.DataLength())
-       buf.ResetForLoopReading()
+               metrics.ConnectionID, metrics.RandomID, 
connection.Buffer().DataLength())
+       connection.Buffer().ResetForLoopReading()
        for {
-               if !buf.PrepareForReading() {
+               if !connection.Buffer().PrepareForReading() {
                        return nil
                }
 
-               messageType, err := reader.IdentityMessageType(buf)
+               messageType, err := 
reader.IdentityMessageType(connection.Buffer())
                if err != nil {
                        http1Log.Debugf("failed to identity message type, %v", 
err)
-                       if buf.SkipCurrentElement() {
+                       if connection.Buffer().SkipCurrentElement() {
                                break
                        }
                        continue
@@ -81,9 +88,9 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf 
*buffer.Buffer, _ *
                var result enums.ParseResult
                switch messageType {
                case reader.MessageTypeRequest:
-                       result, _ = p.handleRequest(metrics, buf)
+                       result, _ = p.handleRequest(metrics, 
connection.Buffer())
                case reader.MessageTypeResponse:
-                       result, _ = p.handleResponse(metrics, buf)
+                       result, _ = p.handleResponse(metrics, 
connection.Buffer())
                case reader.MessageTypeUnknown:
                        result = enums.ParseResultSkipPackage
                }
@@ -91,9 +98,9 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf 
*buffer.Buffer, _ *
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = buf.RemoveReadElements()
+                       finishReading = connection.Buffer().RemoveReadElements()
                case enums.ParseResultSkipPackage:
-                       finishReading = buf.SkipCurrentElement()
+                       finishReading = connection.Buffer().SkipCurrentElement()
                }
 
                if finishReading {
@@ -103,7 +110,11 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, 
buf *buffer.Buffer, _ *
        return nil
 }
 
-func (p *HTTP1Protocol) handleRequest(metrics ProtocolMetrics, buf 
*buffer.Buffer) (enums.ParseResult, error) {
+func (p *HTTP1Protocol) ForProtocol() enums.ConnectionProtocol {
+       return enums.ConnectionProtocolHTTP
+}
+
+func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf 
*buffer.Buffer) (enums.ParseResult, error) {
        req, result, err := reader.ReadRequest(buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
@@ -111,24 +122,23 @@ func (p *HTTP1Protocol) handleRequest(metrics 
ProtocolMetrics, buf *buffer.Buffe
        if result != enums.ParseResultSuccess {
                return result, nil
        }
-       metrics.(*HTTP1Metrics).appendRequestToList(req)
+       metrics.appendRequestToList(req)
        return result, nil
 }
 
-func (p *HTTP1Protocol) handleResponse(metrics ProtocolMetrics, b 
*buffer.Buffer) (enums.ParseResult, error) {
-       http1Metrics := metrics.(*HTTP1Metrics)
-       firstRequest := http1Metrics.halfRequests.Front()
+func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b 
*buffer.Buffer) (enums.ParseResult, error) {
+       firstRequest := metrics.halfRequests.Front()
        if firstRequest == nil {
                return enums.ParseResultSkipPackage, nil
        }
-       request := 
http1Metrics.halfRequests.Remove(firstRequest).(*reader.Request)
+       request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
 
        // parsing response
        response, result, err := reader.ReadResponse(request, b, true)
        defer func() {
                // if parsing response failed, then put the request back to the 
list
                if result != enums.ParseResultSuccess {
-                       http1Metrics.halfRequests.PushFront(request)
+                       metrics.halfRequests.PushFront(request)
                }
        }()
        if err != nil {
@@ -138,31 +148,31 @@ func (p *HTTP1Protocol) handleResponse(metrics 
ProtocolMetrics, b *buffer.Buffer
        }
 
        // getting the request and response, then send to the forwarder
-       p.handleHTTPData(http1Metrics, request, response)
+       p.analyze(metrics, request, response)
        return enums.ParseResultSuccess, nil
 }
 
-func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
+func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
        detailEvents := make([]events.SocketDetail, 0)
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
request.HeaderBuffer())
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
request.BodyBuffer())
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
response.HeaderBuffer())
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
response.BodyBuffer())
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
request.HeaderBuffer())
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
request.BodyBuffer())
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
response.HeaderBuffer())
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
response.BodyBuffer())
 
        if len(detailEvents) == 0 {
                http1Log.Warnf("cannot found any detail events for HTTP/1.x 
protocol, connection ID: %d, random ID: %d, data id: %d-%d",
-                       metrics.connectionID, metrics.randomID,
+                       metrics.ConnectionID, metrics.RandomID,
                        request.MinDataID(), 
response.BodyBuffer().LastSocketBuffer().DataID())
                return
        }
        http1Log.Debugf("found fully HTTP1 request and response, contains %d 
detail events , connection ID: %d, random ID: %d",
-               len(detailEvents), metrics.connectionID, metrics.randomID)
+               len(detailEvents), metrics.ConnectionID, metrics.RandomID)
        originalRequest := request.Original()
        originalResponse := response.Original()
 
        defer func() {
-               p.closeStream(originalRequest.Body)
-               p.closeStream(originalResponse.Body)
+               p.CloseStream(originalRequest.Body)
+               p.CloseStream(originalResponse.Body)
        }()
        forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
@@ -171,12 +181,11 @@ func (p *HTTP1Protocol) handleHTTPData(metrics 
*HTTP1Metrics, request *reader.Re
                                EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP1,
                                Request: &v3.AccessLogHTTPProtocolRequest{
-                                       Method:             
transformHTTPMethod(originalRequest.Method),
+                                       Method:             
TransformHTTPMethod(originalRequest.Method),
                                        Path:               
originalRequest.URL.Path,
                                        SizeOfHeadersBytes: 
uint64(request.HeaderBuffer().DataSize()),
                                        SizeOfBodyBytes:    
uint64(request.BodyBuffer().DataSize()),
-
-                                       Trace: analyzeTraceInfo(func(key 
string) string {
+                                       Trace: AnalyzeTraceInfo(func(key 
string) string {
                                                return 
originalRequest.Header.Get(key)
                                        }, http1Log),
                                },
@@ -190,37 +199,12 @@ func (p *HTTP1Protocol) handleHTTPData(metrics 
*HTTP1Metrics, request *reader.Re
        })
 }
 
-func (p *HTTP1Protocol) closeStream(ioReader io.Closer) {
+func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
        if ioReader != nil {
                _ = ioReader.Close()
        }
 }
 
-func transformHTTPMethod(method string) v3.AccessLogHTTPProtocolRequestMethod {
-       switch method {
-       case "GET":
-               return v3.AccessLogHTTPProtocolRequestMethod_Get
-       case "POST":
-               return v3.AccessLogHTTPProtocolRequestMethod_Post
-       case "PUT":
-               return v3.AccessLogHTTPProtocolRequestMethod_Put
-       case "DELETE":
-               return v3.AccessLogHTTPProtocolRequestMethod_Delete
-       case "HEAD":
-               return v3.AccessLogHTTPProtocolRequestMethod_Head
-       case "OPTIONS":
-               return v3.AccessLogHTTPProtocolRequestMethod_Options
-       case "TRACE":
-               return v3.AccessLogHTTPProtocolRequestMethod_Trace
-       case "CONNECT":
-               return v3.AccessLogHTTPProtocolRequestMethod_Connect
-       case "PATCH":
-               return v3.AccessLogHTTPProtocolRequestMethod_Patch
-       }
-       http1Log.Warnf("unknown http method: %s", method)
-       return v3.AccessLogHTTPProtocolRequestMethod_Get
-}
-
 func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) {
        if m.halfRequests.Len() == 0 {
                m.halfRequests.PushFront(req)
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index a3bf80e..486533c 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -42,14 +42,21 @@ var maxHTTP2StreamingTime = time.Minute * 3
 
 var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
 
-func init() {
-       registeredProtocols[enums.ConnectionProtocolHTTP2] = func(ctx 
*common.AccessLogContext) Protocol {
-               return &HTTP2Protocol{ctx: ctx}
-       }
-}
+type HTTP2StreamAnalyze func(stream *HTTP2Streaming)
 
 type HTTP2Protocol struct {
-       ctx *common.AccessLogContext
+       ctx     *common.AccessLogContext
+       analyze HTTP2StreamAnalyze
+}
+
+func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyze 
HTTP2StreamAnalyze) *HTTP2Protocol {
+       protocol := &HTTP2Protocol{ctx: ctx}
+       if analyze == nil {
+               protocol.analyze = protocol.handleWholeStream
+       } else {
+               protocol.analyze = analyze
+       }
+       return protocol
 }
 
 type HTTP2Metrics struct {
@@ -61,14 +68,14 @@ type HTTP2Metrics struct {
 }
 
 type HTTP2Streaming struct {
-       reqHeader        map[string]string
-       respHeader       map[string]string
-       reqHeaderBuffer  *buffer.Buffer
-       reqBodyBuffer    *buffer.Buffer
-       isInResponse     bool
-       status           int
-       respHeaderBuffer *buffer.Buffer
-       respBodyBuffer   *buffer.Buffer
+       ReqHeader        map[string]string
+       RespHeader       map[string]string
+       ReqHeaderBuffer  *buffer.Buffer
+       ReqBodyBuffer    *buffer.Buffer
+       IsInResponse     bool
+       Status           int
+       RespHeaderBuffer *buffer.Buffer
+       RespBodyBuffer   *buffer.Buffer
 }
 
 func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) 
ProtocolMetrics {
@@ -80,21 +87,21 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID, 
randomID uint64) Protoc
        }
 }
 
-func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, 
helper *AnalyzeHelper) error {
-       http2Metrics := metrics.(*HTTP2Metrics)
+func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper 
*AnalyzeHelper) error {
+       http2Metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
        http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: 
%d, random ID: %d",
                http2Metrics.connectionID, http2Metrics.randomID)
-       buf.ResetForLoopReading()
+       connection.Buffer().ResetForLoopReading()
        for {
-               if !buf.PrepareForReading() {
+               if !connection.Buffer().PrepareForReading() {
                        return nil
                }
 
-               startPosition := buf.Position()
-               header, err := http2.ReadFrameHeader(buf)
+               startPosition := connection.Buffer().Position()
+               header, err := http2.ReadFrameHeader(connection.Buffer())
                if err != nil {
                        http2Log.Debugf("failed to read frame header, %v", err)
-                       if buf.SkipCurrentElement() {
+                       if connection.Buffer().SkipCurrentElement() {
                                break
                        }
                        continue
@@ -105,12 +112,12 @@ func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics, 
buf *buffer.Buffer, hel
                var result enums.ParseResult
                switch header.Type {
                case http2.FrameHeaders:
-                       result, protocolBreak, _ = r.handleHeader(&header, 
startPosition, http2Metrics, buf)
+                       result, protocolBreak, _ = r.handleHeader(&header, 
startPosition, http2Metrics, connection.Buffer())
                case http2.FrameData:
-                       result, protocolBreak, _ = r.handleData(&header, 
startPosition, http2Metrics, buf)
+                       result, protocolBreak, _ = r.handleData(&header, 
startPosition, http2Metrics, connection.Buffer())
                default:
                        tmp := make([]byte, header.Length)
-                       if err := buf.ReadUntilBufferFull(tmp); err != nil {
+                       if err := connection.Buffer().ReadUntilBufferFull(tmp); 
err != nil {
                                if errors.Is(err, buffer.ErrNotComplete) {
                                        result = enums.ParseResultSkipPackage
                                } else {
@@ -132,9 +139,9 @@ func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics, 
buf *buffer.Buffer, hel
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = buf.RemoveReadElements()
+                       finishReading = connection.Buffer().RemoveReadElements()
                case enums.ParseResultSkipPackage:
-                       finishReading = buf.SkipCurrentElement()
+                       finishReading = connection.Buffer().SkipCurrentElement()
                }
 
                if finishReading {
@@ -145,6 +152,10 @@ func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics, 
buf *buffer.Buffer, hel
        return nil
 }
 
+func (r *HTTP2Protocol) ForProtocol() enums.ConnectionProtocol {
+       return enums.ConnectionProtocolHTTP2
+}
+
 func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos 
*buffer.Position,
        metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, 
error) {
        bytes := make([]byte, header.Length)
@@ -161,9 +172,9 @@ func (r *HTTP2Protocol) handleHeader(header 
*http2.FrameHeader, startPos *buffer
        headers := r.parseHeaders(headerData)
        if streaming == nil {
                streaming = &HTTP2Streaming{
-                       reqHeader:       headers,
-                       respHeader:      make(map[string]string),
-                       reqHeaderBuffer: buf.Slice(true, startPos, 
buf.Position()),
+                       ReqHeader:       headers,
+                       RespHeader:      make(map[string]string),
+                       ReqHeaderBuffer: buf.Slice(true, startPos, 
buf.Position()),
                }
                metrics.streams[header.StreamID] = streaming
                return enums.ParseResultSuccess, false, nil
@@ -171,28 +182,28 @@ func (r *HTTP2Protocol) handleHeader(header 
*http2.FrameHeader, startPos *buffer
 
        status, contains := headers[":status"]
        if contains {
-               streaming.isInResponse = true
+               streaming.IsInResponse = true
                code, err := strconv.ParseInt(status, 10, 64)
                if err != nil {
                        log.Warnf("cannot parse status code: %s", status)
                        code = 200
                }
-               streaming.status = int(code)
+               streaming.Status = int(code)
        }
 
-       if !streaming.isInResponse {
-               r.appendHeaders(streaming.reqHeader, headers)
-               streaming.reqHeaderBuffer = buffer.CombineSlices(true, 
streaming.reqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+       if !streaming.IsInResponse {
+               r.AppendHeaders(streaming.ReqHeader, headers)
+               streaming.ReqHeaderBuffer = buffer.CombineSlices(true, 
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
                return enums.ParseResultSuccess, false, nil
        }
 
-       r.appendHeaders(streaming.respHeader, headers)
-       streaming.respHeaderBuffer = buffer.CombineSlices(true, 
streaming.respHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+       r.AppendHeaders(streaming.RespHeader, headers)
+       streaming.RespHeaderBuffer = buffer.CombineSlices(true, 
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
 
        // is end of stream and in the response
        if header.Flags.Has(http2.FlagHeadersEndStream) {
                // should be end of the stream and send to the protocol
-               r.handleWholeStream(streaming)
+               r.analyze(streaming)
                // delete streaming
                delete(metrics.streams, header.StreamID)
        }
@@ -201,91 +212,91 @@ func (r *HTTP2Protocol) handleHeader(header 
*http2.FrameHeader, startPos *buffer
 
 func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id 
uint32, streaming *HTTP2Streaming) {
        // if in the response mode or the request body is not nil, then skip
-       if streaming.isInResponse || streaming.reqBodyBuffer == nil {
+       if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
                return
        }
 
        // is the body sending too long, then split the stream
-       socketBuffer := streaming.reqBodyBuffer.FirstSocketBuffer()
+       socketBuffer := streaming.ReqBodyBuffer.FirstSocketBuffer()
        if socketBuffer == nil {
                return
        }
        if time.Since(host.Time(socketBuffer.StartTime())) > 
maxHTTP2StreamingTime {
                http2Log.Infof("detect the HTTP/2 stream is too long, split the 
stream, connection ID: %d, stream ID: %d, headers: %v",
-                       metrics.connectionID, id, streaming.reqHeader)
+                       metrics.connectionID, id, streaming.ReqHeader)
 
-               r.handleWholeStream(streaming)
+               r.analyze(streaming)
 
                // clean sent buffers
-               if streaming.reqBodyBuffer != nil {
-                       streaming.reqBodyBuffer.Clean()
+               if streaming.ReqBodyBuffer != nil {
+                       streaming.ReqBodyBuffer.Clean()
                }
        }
 }
 
 func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
        detailEvents := make([]events.SocketDetail, 0)
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.reqHeaderBuffer)
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.reqBodyBuffer)
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.respHeaderBuffer)
-       detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.respBodyBuffer)
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.ReqHeaderBuffer)
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.ReqBodyBuffer)
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.RespHeaderBuffer)
+       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.RespBodyBuffer)
 
        if len(detailEvents) == 0 {
                http2Log.Warnf("cannot found any detail events for HTTP/2 
protocol, data id: %d-%d",
-                       stream.reqHeaderBuffer.FirstSocketBuffer().DataID(), 
stream.respBodyBuffer.LastSocketBuffer().DataID())
+                       stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), 
stream.RespBodyBuffer.LastSocketBuffer().DataID())
                return
        }
 
        forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer, 
detailEvents[0]).GetStartTime()),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, 
detailEvents[0]).GetStartTime()),
                                EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP2,
                                Request: &v3.AccessLogHTTPProtocolRequest{
-                                       Method:             
r.parseHTTPMethod(stream),
-                                       Path:               
stream.reqHeader[":path"],
-                                       SizeOfHeadersBytes: 
r.bufferSizeOfZero(stream.reqHeaderBuffer),
-                                       SizeOfBodyBytes:    
r.bufferSizeOfZero(stream.reqBodyBuffer),
+                                       Method:             
r.ParseHTTPMethod(stream),
+                                       Path:               
stream.ReqHeader[":path"],
+                                       SizeOfHeadersBytes: 
r.BufferSizeOfZero(stream.ReqHeaderBuffer),
+                                       SizeOfBodyBytes:    
r.BufferSizeOfZero(stream.ReqBodyBuffer),
 
-                                       Trace: analyzeTraceInfo(func(key 
string) string {
-                                               return stream.reqHeader[key]
+                                       Trace: AnalyzeTraceInfo(func(key 
string) string {
+                                               return stream.ReqHeader[key]
                                        }, http2Log),
                                },
                                Response: &v3.AccessLogHTTPProtocolResponse{
-                                       StatusCode:         
int32(stream.status),
-                                       SizeOfHeadersBytes: 
r.bufferSizeOfZero(stream.respHeaderBuffer),
-                                       SizeOfBodyBytes:    
r.bufferSizeOfZero(stream.respBodyBuffer),
+                                       StatusCode:         
int32(stream.Status),
+                                       SizeOfHeadersBytes: 
r.BufferSizeOfZero(stream.RespHeaderBuffer),
+                                       SizeOfBodyBytes:    
r.BufferSizeOfZero(stream.RespBodyBuffer),
                                },
                        },
                },
        })
 }
 
-func (r *HTTP2Protocol) parseHTTPMethod(streaming *HTTP2Streaming) 
v3.AccessLogHTTPProtocolRequestMethod {
-       method := streaming.reqHeader[":method"]
+func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) 
v3.AccessLogHTTPProtocolRequestMethod {
+       method := streaming.ReqHeader[":method"]
        if method == "" {
                return v3.AccessLogHTTPProtocolRequestMethod_Get
        }
 
-       return transformHTTPMethod(strings.ToUpper(method))
+       return TransformHTTPMethod(strings.ToUpper(method))
 }
 
-func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def 
events.SocketDetail) events.SocketDetail {
+func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def 
events.SocketDetail) events.SocketDetail {
        if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
                return def
        }
        return buf.Details().Front().Value.(events.SocketDetail)
 }
 
-func (r *HTTP2Protocol) bufferSizeOfZero(buf *buffer.Buffer) uint64 {
+func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 {
        if buf == nil {
                return 0
        }
        return uint64(buf.DataSize())
 }
 
-func (r *HTTP2Protocol) appendHeaders(exist, needAppends map[string]string) {
+func (r *HTTP2Protocol) AppendHeaders(exist, needAppends map[string]string) {
        for k, v := range needAppends {
                exist[k] = v
        }
@@ -302,10 +313,10 @@ func (r *HTTP2Protocol) handleData(header 
*http2.FrameHeader, startPos *buffer.P
        if err := buf.ReadUntilBufferFull(bytes); err != nil {
                return enums.ParseResultSkipPackage, false, err
        }
-       if !streaming.isInResponse {
-               streaming.reqBodyBuffer = buffer.CombineSlices(true, 
streaming.reqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+       if !streaming.IsInResponse {
+               streaming.ReqBodyBuffer = buffer.CombineSlices(true, 
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        } else {
-               streaming.respBodyBuffer = buffer.CombineSlices(true, 
streaming.respBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.RespBodyBuffer = buffer.CombineSlices(true, 
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        }
 
        r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
diff --git a/pkg/accesslog/collector/protocols/protocol.go 
b/pkg/accesslog/collector/protocols/protocol.go
index 17ff30c..41ac8a2 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -18,7 +18,6 @@
 package protocols
 
 import (
-       "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/accesslog/events"
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/tools/buffer"
@@ -28,18 +27,16 @@ import (
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
-var registeredProtocols = make(map[enums.ConnectionProtocol]func(ctx 
*common.AccessLogContext) Protocol)
-
 type ProtocolManager struct {
        protocols map[enums.ConnectionProtocol]Protocol
 }
 
-func NewProtocolManager(ctx *common.AccessLogContext) *ProtocolManager {
-       protocols := make(map[enums.ConnectionProtocol]Protocol)
-       for protocol, generator := range registeredProtocols {
-               protocols[protocol] = generator(ctx)
+func NewProtocolManager(protocols []Protocol) *ProtocolManager {
+       m := make(map[enums.ConnectionProtocol]Protocol)
+       for _, protocol := range protocols {
+               m[protocol.ForProtocol()] = protocol
        }
-       return &ProtocolManager{protocols: protocols}
+       return &ProtocolManager{protocols: m}
 }
 
 func (a *ProtocolManager) GetProtocol(protocol enums.ConnectionProtocol) 
Protocol {
@@ -54,11 +51,12 @@ type AnalyzeHelper struct {
 }
 
 type Protocol interface {
+       ForProtocol() enums.ConnectionProtocol
        GenerateConnection(connectionID, randomID uint64) ProtocolMetrics
-       Analyze(metrics ProtocolMetrics, buffer *buffer.Buffer, helper 
*AnalyzeHelper) error
+       Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error
 }
 
-func appendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer) []events.SocketDetail {
+func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer) []events.SocketDetail {
        if buf == nil || buf.DetailLength() == 0 {
                return result
        }
@@ -71,7 +69,7 @@ func appendSocketDetailsFromBuffer(result 
[]events.SocketDetail, buf *buffer.Buf
        return result
 }
 
-func analyzeTraceInfo(fetcher func(key string) string, protocolLog 
*logger.Logger) *v3.AccessLogTraceInfo {
+func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog 
*logger.Logger) *v3.AccessLogTraceInfo {
        context, err := tracing.AnalyzeTracingContext(func(key string) string {
                return fetcher(key)
        })
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index cc4cdfe..e505d3e 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -49,7 +49,8 @@ type AnalyzeQueue struct {
        eventQueue   *btf.EventQueue
        perCPUBuffer int64
 
-       detailSupplier func() events.SocketDetail
+       detailSupplier   func() events.SocketDetail
+       supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
 }
 
 func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
@@ -70,16 +71,23 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
        return &AnalyzeQueue{
                context:      ctx,
                perCPUBuffer: perCPUBufferSize,
-               eventQueue: 
btf.NewEventQueue(ctx.Config.ProtocolAnalyze.Parallels, 
ctx.Config.ProtocolAnalyze.QueueSize, func(num int) btf.PartitionContext {
-                       return NewPartitionContext(ctx, num)
-               }),
                detailSupplier: func() events.SocketDetail {
                        return &events.SocketDetailEvent{}
                },
+               supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol 
{
+                       return []Protocol{
+                               NewHTTP1Analyzer(ctx, nil),
+                               NewHTTP2Analyzer(ctx, nil),
+                       }
+               },
        }, nil
 }
 
 func (q *AnalyzeQueue) Start(ctx context.Context) {
+       q.eventQueue = 
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.Parallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
+               func(num int) btf.PartitionContext {
+                       return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
+               })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, 
int(q.perCPUBuffer), func() interface{} {
                return q.detailSupplier()
        }, func(data interface{}) string {
@@ -98,6 +106,10 @@ func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func() 
events.SocketDetail)
        q.detailSupplier = supplier
 }
 
+func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx 
*common.AccessLogContext) []Protocol) {
+       q.supportAnalyzers = protocols
+}
+
 type PartitionContext struct {
        context      *common.AccessLogContext
        protocolMgr  *ProtocolManager
@@ -108,21 +120,31 @@ type PartitionContext struct {
 }
 
 func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID 
uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
-       analyzer := protocolMgr.GetProtocol(protocol)
-       return &PartitionConnection{
+       connection := &PartitionConnection{
                connectionID:     conID,
                randomID:         randomID,
                dataBuffer:       buffer.NewBuffer(),
-               protocol:         protocol,
-               protocolAnalyzer: analyzer,
-               protocolMetrics:  analyzer.GenerateConnection(conID, randomID),
+               protocol:         make(map[enums.ConnectionProtocol]bool),
+               protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
+               protocolMetrics:  
make(map[enums.ConnectionProtocol]ProtocolMetrics),
        }
+       connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol)
+       return connection
 }
 
-func NewPartitionContext(ctx *common.AccessLogContext, num int) 
*PartitionContext {
+func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr 
*ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) {
+       if _, exist := p.protocol[protocol]; !exist {
+               analyzer := protocolMgr.GetProtocol(protocol)
+               p.protocol[protocol] = true
+               p.protocolAnalyzer[protocol] = analyzer
+               p.protocolMetrics[protocol] = 
analyzer.GenerateConnection(conID, randomID)
+       }
+}
+
+func NewPartitionContext(ctx *common.AccessLogContext, num int, protocols 
[]Protocol) *PartitionContext {
        pc := &PartitionContext{
                context:      ctx,
-               protocolMgr:  NewProtocolManager(ctx),
+               protocolMgr:  NewProtocolManager(protocols),
                connections:  cmap.New(),
                partitionNum: num,
        }
@@ -191,13 +213,13 @@ func (p *PartitionContext) Consume(data interface{}) {
                        return
                }
                connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol())
-               connection.appendDetail(p.context, event)
+               connection.AppendDetail(p.context, event)
        case *events.SocketDataUploadEvent:
                pid, _ := events.ParseConnectionID(event.ConnectionID)
                log.Debugf("receive the socket data event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
                        event.ConnectionID, event.RandomID, pid, event.DataID0, 
event.Sequence0, event.Protocol)
                connection := p.getConnectionContext(event.ConnectionID, 
event.RandomID, event.Protocol)
-               connection.appendData(event)
+               connection.AppendData(event)
        }
 }
 
@@ -205,7 +227,9 @@ func (p *PartitionContext) 
getConnectionContext(connectionID, randomID uint64, p
        conKey := p.buildConnectionKey(connectionID, randomID)
        conn, exist := p.connections.Get(conKey)
        if exist {
-               return conn.(*PartitionConnection)
+               connection := conn.(*PartitionConnection)
+               connection.appendProtocolIfNeed(p.protocolMgr, connectionID, 
randomID, protocol)
+               return connection
        }
        result := newPartitionConnection(p.protocolMgr, connectionID, randomID, 
protocol)
        p.connections.Set(conKey, result)
@@ -296,8 +320,10 @@ func (p *PartitionContext) 
processConnectionEvents(connection *PartitionConnecti
                return
        }
        helper := &AnalyzeHelper{}
-       if err := 
connection.protocolAnalyzer.Analyze(connection.protocolMetrics, 
connection.dataBuffer, helper); err != nil {
-               log.Warnf("failed to analyze the %s protocol data: %v", 
connection.protocol.String(), err)
+       for protocol, analyzer := range connection.protocolAnalyzer {
+               if err := analyzer.Analyze(connection, helper); err != nil {
+                       log.Warnf("failed to analyze the %s protocol data: %v", 
enums.ConnectionProtocolString(protocol), err)
+               }
        }
 
        if helper.ProtocolBreak {
@@ -307,31 +333,3 @@ func (p *PartitionContext) 
processConnectionEvents(connection *PartitionConnecti
                connection.dataBuffer.Clean()
        }
 }
-
-type PartitionConnection struct {
-       connectionID, randomID uint64
-       dataBuffer             *buffer.Buffer
-       protocol               enums.ConnectionProtocol
-       protocolAnalyzer       Protocol
-       protocolMetrics        ProtocolMetrics
-       closed                 bool
-       closeCallback          common.ConnectionProcessFinishCallback
-       skipAllDataAnalyze     bool
-       lastCheckCloseTime     time.Time
-}
-
-func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, 
detail events.SocketDetail) {
-       if p.skipAllDataAnalyze {
-               // if the connection is already skip all data analyze, then 
just send the detail event
-               forwarder.SendTransferNoProtocolEvent(ctx, detail)
-               return
-       }
-       p.dataBuffer.AppendDetailEvent(detail)
-}
-
-func (p *PartitionConnection) appendData(data buffer.SocketDataBuffer) {
-       if p.skipAllDataAnalyze {
-               return
-       }
-       p.dataBuffer.AppendDataEvent(data)
-}
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go 
b/pkg/profiling/task/network/analyze/base/metrics.go
index 60ec728..c856d55 100644
--- a/pkg/profiling/task/network/analyze/base/metrics.go
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -111,7 +111,7 @@ func (m *MetricsBuilder) BuildBasicMeterLabels(traffic 
*ProcessTraffic, local ap
        labels = m.appendMeterValue(labels, "side", curRole.String())
 
        // protocol and ssl
-       labels = m.appendMeterValue(labels, "protocol", 
traffic.Protocol.String())
+       labels = m.appendMeterValue(labels, "protocol", 
enums.ConnectionProtocolString(traffic.Protocol))
        labels = m.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", 
traffic.IsSSL))
        return curRole, labels
 }
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go 
b/pkg/profiling/task/network/analyze/layer4/listener.go
index 0c16145..e9e4caa 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -111,7 +111,7 @@ func (l *Listener) PreFlushConnectionMetrics(ccs 
[]*base.ConnectionWithBPF, bpfL
                        log.Debugf("found connection: %d, %s relation: 
%s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d 
bytes/%d, read: %d bytes/%d",
                                connection.ConnectionID, 
connection.Role.String(),
                                connection.LocalIP, connection.LocalPort, 
connection.LocalPid, connection.RemoteIP, connection.RemotePort,
-                               connection.Protocol.String(), connection.IsSSL, 
connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
+                               
enums.ConnectionProtocolString(connection.Protocol), connection.IsSSL, 
connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
                                layer4.WriteCounter.Cur.Count, 
layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
                }
        }
@@ -218,7 +218,8 @@ func (l *Listener) logTheMetricsConnections(traffics 
[]*base.ProcessTraffic) {
                side := traffic.Role.String()
                layer4 := l.getMetrics(traffic.Metrics)
                log.Debugf("connection layer4 analyze result: %s : %s, 
protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
-                       side, traffic.GenerateConnectionInfo(), 
traffic.Protocol.String(), traffic.IsSSL, layer4.WriteCounter.Cur.Bytes, 
layer4.WriteCounter.Cur.Count,
+                       side, traffic.GenerateConnectionInfo(), 
enums.ConnectionProtocolString(traffic.Protocol),
+                       traffic.IsSSL, layer4.WriteCounter.Cur.Bytes, 
layer4.WriteCounter.Cur.Count,
                        layer4.ReadCounter.Cur.Bytes, 
layer4.ReadCounter.Cur.Count)
        }
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index ac12d93..52b181d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -199,7 +199,7 @@ func (a *ProtocolAnalyzer) 
processConnectionEvents(connection *connectionInfo) {
 
 func (a *ProtocolAnalyzer) processConnectionExpireEvents(connection 
*connectionInfo, expireDuration time.Duration) {
        if c := connection.buffer.DeleteExpireEvents(expireDuration); c > 0 {
-               log.Debugf("total removed %d expired events for %s protocol", 
c, a.protocol.Protocol().String())
+               log.Debugf("total removed %d expired events for %s protocol", 
c, enums.ConnectionProtocolString(a.protocol.Protocol()))
        }
 }
 
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 28dd6d4..4c73cf0 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -204,7 +204,7 @@ func (h *Trace) Flush(duration int64, process 
api.ProcessInterface, traffic *bas
                Latency:       duration,
                TraceProvider: h.Trace.Provider().Name,
                DetectPoint:   traffic.Role.String(),
-               Component:     traffic.Protocol.String(),
+               Component:     enums.ConnectionProtocolString(traffic.Protocol),
                SSL:           traffic.IsSSL,
                URI:           h.RequestURI,
                Reason:        h.Type,
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index f26826b..60bcd71 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -74,7 +74,7 @@ func (a *Analyzer) ReceiveSocketDataEvent(event 
*events.SocketDataUploadEvent) {
        analyzer := a.protocols[event.Protocol]
        if analyzer == nil {
                log.Warnf("could not found any protocol to handle socket data, 
connection id: %s, protocol: %s(%d)",
-                       event.GenerateConnectionID(), event.Protocol.String(), 
event.Protocol)
+                       event.GenerateConnectionID(), 
enums.ConnectionProtocolString(event.Protocol), event.Protocol)
                return
        }
        analyzer.ReceiveSocketData(a.ctx, event)
@@ -84,7 +84,7 @@ func (a *Analyzer) ReceiveSocketDetail(event 
*events.SocketDetailEvent) {
        analyzer := a.protocols[event.Protocol]
        if analyzer == nil {
                log.Warnf("could not found any protocol to handle socket 
detail, connection id: %s, protocol: %s(%d)",
-                       event.GenerateConnectionID(), event.Protocol.String(), 
event.Protocol)
+                       event.GenerateConnectionID(), 
enums.ConnectionProtocolString(event.Protocol), event.Protocol)
                return
        }
        analyzer.ReceiveSocketDetail(a.ctx, event)
diff --git a/pkg/tools/enums/protocol.go b/pkg/tools/enums/protocol.go
index 555705c..be10bbd 100644
--- a/pkg/tools/enums/protocol.go
+++ b/pkg/tools/enums/protocol.go
@@ -23,3 +23,29 @@ const (
        ParseResultSuccess ParseResult = iota
        ParseResultSkipPackage
 )
+
+type ConnectionProtocol uint8
+
+const (
+       ConnectionProtocolUnknown ConnectionProtocol = 0
+       ConnectionProtocolHTTP    ConnectionProtocol = 1
+       ConnectionProtocolHTTP2   ConnectionProtocol = 2
+)
+
+var connectionProtocolMap = make(map[ConnectionProtocol]string)
+
+func init() {
+       RegisterConnectionProtocolString(ConnectionProtocolHTTP, http)
+       RegisterConnectionProtocolString(ConnectionProtocolHTTP2, http)
+}
+
+func RegisterConnectionProtocolString(protocol ConnectionProtocol, name 
string) {
+       connectionProtocolMap[protocol] = name
+}
+
+func ConnectionProtocolString(protocol ConnectionProtocol) string {
+       if name, ok := connectionProtocolMap[protocol]; ok {
+               return name
+       }
+       return unknown
+}
diff --git a/pkg/tools/enums/socket.go b/pkg/tools/enums/socket.go
index 20caa57..8c4ee4d 100644
--- a/pkg/tools/enums/socket.go
+++ b/pkg/tools/enums/socket.go
@@ -92,27 +92,6 @@ const (
        SocketExceptionOperationDrop       SocketExceptionOperationType = 2
 )
 
-type ConnectionProtocol uint8
-
-const (
-       ConnectionProtocolUnknown ConnectionProtocol = 0
-       ConnectionProtocolHTTP    ConnectionProtocol = 1
-       ConnectionProtocolHTTP2   ConnectionProtocol = 2
-)
-
-func (c ConnectionProtocol) String() string {
-       switch c {
-       case ConnectionProtocolUnknown:
-               return unknown
-       case ConnectionProtocolHTTP:
-               return http
-       case ConnectionProtocolHTTP2:
-               return http
-       default:
-               return unknown
-       }
-}
-
 type SocketMessageType uint8
 
 const (


Reply via email to