This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new e522b40 Refactor the protocol analyze logical (#156)
e522b40 is described below
commit e522b40baa8ec63f3154fb8b763bc57377a6bf74
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 19 12:47:22 2024 +0900
Refactor the protocol analyze logical (#156)
---
pkg/accesslog/collector/protocols/connection.go | 69 ++++++++++
pkg/accesslog/collector/protocols/http.go | 46 +++++++
pkg/accesslog/collector/protocols/http1.go | 118 ++++++++---------
pkg/accesslog/collector/protocols/http2.go | 145 +++++++++++----------
pkg/accesslog/collector/protocols/protocol.go | 20 ++-
pkg/accesslog/collector/protocols/queue.go | 86 ++++++------
pkg/profiling/task/network/analyze/base/metrics.go | 2 +-
.../task/network/analyze/layer4/listener.go | 5 +-
.../analyze/layer7/protocols/base/analyzer.go | 2 +-
.../analyze/layer7/protocols/http1/metrics.go | 2 +-
.../network/analyze/layer7/protocols/protocols.go | 4 +-
pkg/tools/enums/protocol.go | 26 ++++
pkg/tools/enums/socket.go | 21 ---
13 files changed, 329 insertions(+), 217 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/connection.go
b/pkg/accesslog/collector/protocols/connection.go
new file mode 100644
index 0000000..7ae9325
--- /dev/null
+++ b/pkg/accesslog/collector/protocols/connection.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package protocols
+
+import (
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/accesslog/common"
+ "github.com/apache/skywalking-rover/pkg/accesslog/events"
+ "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
+ "github.com/apache/skywalking-rover/pkg/tools/buffer"
+ "github.com/apache/skywalking-rover/pkg/tools/enums"
+)
+
+type PartitionConnection struct {
+ connectionID, randomID uint64
+ dataBuffer *buffer.Buffer
+ protocol map[enums.ConnectionProtocol]bool
+ protocolAnalyzer map[enums.ConnectionProtocol]Protocol
+ protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
+ closed bool
+ closeCallback common.ConnectionProcessFinishCallback
+ skipAllDataAnalyze bool
+ lastCheckCloseTime time.Time
+}
+
+func (p *PartitionConnection) Metrics(protocol enums.ConnectionProtocol)
ProtocolMetrics {
+ return p.protocolMetrics[protocol]
+}
+
+func (p *PartitionConnection) IsExistProtocol(protocol
enums.ConnectionProtocol) bool {
+ _, exist := p.protocol[protocol]
+ return exist
+}
+
+func (p *PartitionConnection) Buffer() *buffer.Buffer {
+ return p.dataBuffer
+}
+
+func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext,
detail events.SocketDetail) {
+ if p.skipAllDataAnalyze {
+ // if the connection is already skip all data analyze, then
just send the detail event
+ forwarder.SendTransferNoProtocolEvent(ctx, detail)
+ return
+ }
+ p.dataBuffer.AppendDetailEvent(detail)
+}
+
+func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
+ if p.skipAllDataAnalyze {
+ return
+ }
+ p.dataBuffer.AppendDataEvent(data)
+}
diff --git a/pkg/accesslog/collector/protocols/http.go
b/pkg/accesslog/collector/protocols/http.go
new file mode 100644
index 0000000..3b7d076
--- /dev/null
+++ b/pkg/accesslog/collector/protocols/http.go
@@ -0,0 +1,46 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package protocols
+
+import v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+
+// TransformHTTPMethod transforms the http method to the
v3.AccessLogHTTPProtocolRequestMethod
+func TransformHTTPMethod(method string) v3.AccessLogHTTPProtocolRequestMethod {
+ switch method {
+ case "GET":
+ return v3.AccessLogHTTPProtocolRequestMethod_Get
+ case "POST":
+ return v3.AccessLogHTTPProtocolRequestMethod_Post
+ case "PUT":
+ return v3.AccessLogHTTPProtocolRequestMethod_Put
+ case "DELETE":
+ return v3.AccessLogHTTPProtocolRequestMethod_Delete
+ case "HEAD":
+ return v3.AccessLogHTTPProtocolRequestMethod_Head
+ case "OPTIONS":
+ return v3.AccessLogHTTPProtocolRequestMethod_Options
+ case "TRACE":
+ return v3.AccessLogHTTPProtocolRequestMethod_Trace
+ case "CONNECT":
+ return v3.AccessLogHTTPProtocolRequestMethod_Connect
+ case "PATCH":
+ return v3.AccessLogHTTPProtocolRequestMethod_Patch
+ }
+ http1Log.Warnf("unknown http method: %s", method)
+ return v3.AccessLogHTTPProtocolRequestMethod_Get
+}
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index a33e170..06b1b49 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -34,45 +34,52 @@ import (
var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
-func init() {
- registeredProtocols[enums.ConnectionProtocolHTTP] = func(ctx
*common.AccessLogContext) Protocol {
- return &HTTP1Protocol{ctx: ctx}
- }
-}
+type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request,
response *reader.Response)
type HTTP1Protocol struct {
- ctx *common.AccessLogContext
+ ctx *common.AccessLogContext
+ analyze HTTP1ProtocolAnalyze
+}
+
+func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze
HTTP1ProtocolAnalyze) *HTTP1Protocol {
+ protocol := &HTTP1Protocol{ctx: ctx}
+ if analyze == nil {
+ protocol.analyze = protocol.HandleHTTPData
+ } else {
+ protocol.analyze = analyze
+ }
+ return protocol
}
type HTTP1Metrics struct {
- connectionID uint64
- randomID uint64
+ ConnectionID uint64
+ RandomID uint64
halfRequests *list.List
}
func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64)
ProtocolMetrics {
return &HTTP1Metrics{
- connectionID: connectionID,
- randomID: randomID,
+ ConnectionID: connectionID,
+ RandomID: randomID,
halfRequests: list.New(),
}
}
-func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, _
*AnalyzeHelper) error {
- http1Metrics := metrics.(*HTTP1Metrics)
+func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _
*AnalyzeHelper) error {
+ metrics :=
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID:
%d, random ID: %d, data len: %d",
- http1Metrics.connectionID, http1Metrics.randomID,
buf.DataLength())
- buf.ResetForLoopReading()
+ metrics.ConnectionID, metrics.RandomID,
connection.Buffer().DataLength())
+ connection.Buffer().ResetForLoopReading()
for {
- if !buf.PrepareForReading() {
+ if !connection.Buffer().PrepareForReading() {
return nil
}
- messageType, err := reader.IdentityMessageType(buf)
+ messageType, err :=
reader.IdentityMessageType(connection.Buffer())
if err != nil {
http1Log.Debugf("failed to identity message type, %v",
err)
- if buf.SkipCurrentElement() {
+ if connection.Buffer().SkipCurrentElement() {
break
}
continue
@@ -81,9 +88,9 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf
*buffer.Buffer, _ *
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
- result, _ = p.handleRequest(metrics, buf)
+ result, _ = p.handleRequest(metrics,
connection.Buffer())
case reader.MessageTypeResponse:
- result, _ = p.handleResponse(metrics, buf)
+ result, _ = p.handleResponse(metrics,
connection.Buffer())
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}
@@ -91,9 +98,9 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf
*buffer.Buffer, _ *
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = buf.RemoveReadElements()
+ finishReading = connection.Buffer().RemoveReadElements()
case enums.ParseResultSkipPackage:
- finishReading = buf.SkipCurrentElement()
+ finishReading = connection.Buffer().SkipCurrentElement()
}
if finishReading {
@@ -103,7 +110,11 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics,
buf *buffer.Buffer, _ *
return nil
}
-func (p *HTTP1Protocol) handleRequest(metrics ProtocolMetrics, buf
*buffer.Buffer) (enums.ParseResult, error) {
+func (p *HTTP1Protocol) ForProtocol() enums.ConnectionProtocol {
+ return enums.ConnectionProtocolHTTP
+}
+
+func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf
*buffer.Buffer) (enums.ParseResult, error) {
req, result, err := reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
@@ -111,24 +122,23 @@ func (p *HTTP1Protocol) handleRequest(metrics
ProtocolMetrics, buf *buffer.Buffe
if result != enums.ParseResultSuccess {
return result, nil
}
- metrics.(*HTTP1Metrics).appendRequestToList(req)
+ metrics.appendRequestToList(req)
return result, nil
}
-func (p *HTTP1Protocol) handleResponse(metrics ProtocolMetrics, b
*buffer.Buffer) (enums.ParseResult, error) {
- http1Metrics := metrics.(*HTTP1Metrics)
- firstRequest := http1Metrics.halfRequests.Front()
+func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b
*buffer.Buffer) (enums.ParseResult, error) {
+ firstRequest := metrics.halfRequests.Front()
if firstRequest == nil {
return enums.ParseResultSkipPackage, nil
}
- request :=
http1Metrics.halfRequests.Remove(firstRequest).(*reader.Request)
+ request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
// parsing response
response, result, err := reader.ReadResponse(request, b, true)
defer func() {
// if parsing response failed, then put the request back to the
list
if result != enums.ParseResultSuccess {
- http1Metrics.halfRequests.PushFront(request)
+ metrics.halfRequests.PushFront(request)
}
}()
if err != nil {
@@ -138,31 +148,31 @@ func (p *HTTP1Protocol) handleResponse(metrics
ProtocolMetrics, b *buffer.Buffer
}
// getting the request and response, then send to the forwarder
- p.handleHTTPData(http1Metrics, request, response)
+ p.analyze(metrics, request, response)
return enums.ParseResultSuccess, nil
}
-func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request
*reader.Request, response *reader.Response) {
+func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request
*reader.Request, response *reader.Response) {
detailEvents := make([]events.SocketDetail, 0)
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
request.HeaderBuffer())
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
request.BodyBuffer())
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
response.HeaderBuffer())
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
response.BodyBuffer())
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
request.HeaderBuffer())
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
request.BodyBuffer())
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
response.HeaderBuffer())
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
response.BodyBuffer())
if len(detailEvents) == 0 {
http1Log.Warnf("cannot found any detail events for HTTP/1.x
protocol, connection ID: %d, random ID: %d, data id: %d-%d",
- metrics.connectionID, metrics.randomID,
+ metrics.ConnectionID, metrics.RandomID,
request.MinDataID(),
response.BodyBuffer().LastSocketBuffer().DataID())
return
}
http1Log.Debugf("found fully HTTP1 request and response, contains %d
detail events , connection ID: %d, random ID: %d",
- len(detailEvents), metrics.connectionID, metrics.randomID)
+ len(detailEvents), metrics.ConnectionID, metrics.RandomID)
originalRequest := request.Original()
originalResponse := response.Original()
defer func() {
- p.closeStream(originalRequest.Body)
- p.closeStream(originalResponse.Body)
+ p.CloseStream(originalRequest.Body)
+ p.CloseStream(originalResponse.Body)
}()
forwarder.SendTransferProtocolEvent(p.ctx, detailEvents,
&v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
@@ -171,12 +181,11 @@ func (p *HTTP1Protocol) handleHTTPData(metrics
*HTTP1Metrics, request *reader.Re
EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version:
v3.AccessLogHTTPProtocolVersion_HTTP1,
Request: &v3.AccessLogHTTPProtocolRequest{
- Method:
transformHTTPMethod(originalRequest.Method),
+ Method:
TransformHTTPMethod(originalRequest.Method),
Path:
originalRequest.URL.Path,
SizeOfHeadersBytes:
uint64(request.HeaderBuffer().DataSize()),
SizeOfBodyBytes:
uint64(request.BodyBuffer().DataSize()),
-
- Trace: analyzeTraceInfo(func(key
string) string {
+ Trace: AnalyzeTraceInfo(func(key
string) string {
return
originalRequest.Header.Get(key)
}, http1Log),
},
@@ -190,37 +199,12 @@ func (p *HTTP1Protocol) handleHTTPData(metrics
*HTTP1Metrics, request *reader.Re
})
}
-func (p *HTTP1Protocol) closeStream(ioReader io.Closer) {
+func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
if ioReader != nil {
_ = ioReader.Close()
}
}
-func transformHTTPMethod(method string) v3.AccessLogHTTPProtocolRequestMethod {
- switch method {
- case "GET":
- return v3.AccessLogHTTPProtocolRequestMethod_Get
- case "POST":
- return v3.AccessLogHTTPProtocolRequestMethod_Post
- case "PUT":
- return v3.AccessLogHTTPProtocolRequestMethod_Put
- case "DELETE":
- return v3.AccessLogHTTPProtocolRequestMethod_Delete
- case "HEAD":
- return v3.AccessLogHTTPProtocolRequestMethod_Head
- case "OPTIONS":
- return v3.AccessLogHTTPProtocolRequestMethod_Options
- case "TRACE":
- return v3.AccessLogHTTPProtocolRequestMethod_Trace
- case "CONNECT":
- return v3.AccessLogHTTPProtocolRequestMethod_Connect
- case "PATCH":
- return v3.AccessLogHTTPProtocolRequestMethod_Patch
- }
- http1Log.Warnf("unknown http method: %s", method)
- return v3.AccessLogHTTPProtocolRequestMethod_Get
-}
-
func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) {
if m.halfRequests.Len() == 0 {
m.halfRequests.PushFront(req)
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index a3bf80e..486533c 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -42,14 +42,21 @@ var maxHTTP2StreamingTime = time.Minute * 3
var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
-func init() {
- registeredProtocols[enums.ConnectionProtocolHTTP2] = func(ctx
*common.AccessLogContext) Protocol {
- return &HTTP2Protocol{ctx: ctx}
- }
-}
+type HTTP2StreamAnalyze func(stream *HTTP2Streaming)
type HTTP2Protocol struct {
- ctx *common.AccessLogContext
+ ctx *common.AccessLogContext
+ analyze HTTP2StreamAnalyze
+}
+
+func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyze
HTTP2StreamAnalyze) *HTTP2Protocol {
+ protocol := &HTTP2Protocol{ctx: ctx}
+ if analyze == nil {
+ protocol.analyze = protocol.handleWholeStream
+ } else {
+ protocol.analyze = analyze
+ }
+ return protocol
}
type HTTP2Metrics struct {
@@ -61,14 +68,14 @@ type HTTP2Metrics struct {
}
type HTTP2Streaming struct {
- reqHeader map[string]string
- respHeader map[string]string
- reqHeaderBuffer *buffer.Buffer
- reqBodyBuffer *buffer.Buffer
- isInResponse bool
- status int
- respHeaderBuffer *buffer.Buffer
- respBodyBuffer *buffer.Buffer
+ ReqHeader map[string]string
+ RespHeader map[string]string
+ ReqHeaderBuffer *buffer.Buffer
+ ReqBodyBuffer *buffer.Buffer
+ IsInResponse bool
+ Status int
+ RespHeaderBuffer *buffer.Buffer
+ RespBodyBuffer *buffer.Buffer
}
func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64)
ProtocolMetrics {
@@ -80,21 +87,21 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID,
randomID uint64) Protoc
}
}
-func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer,
helper *AnalyzeHelper) error {
- http2Metrics := metrics.(*HTTP2Metrics)
+func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper
*AnalyzeHelper) error {
+ http2Metrics :=
connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID:
%d, random ID: %d",
http2Metrics.connectionID, http2Metrics.randomID)
- buf.ResetForLoopReading()
+ connection.Buffer().ResetForLoopReading()
for {
- if !buf.PrepareForReading() {
+ if !connection.Buffer().PrepareForReading() {
return nil
}
- startPosition := buf.Position()
- header, err := http2.ReadFrameHeader(buf)
+ startPosition := connection.Buffer().Position()
+ header, err := http2.ReadFrameHeader(connection.Buffer())
if err != nil {
http2Log.Debugf("failed to read frame header, %v", err)
- if buf.SkipCurrentElement() {
+ if connection.Buffer().SkipCurrentElement() {
break
}
continue
@@ -105,12 +112,12 @@ func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics,
buf *buffer.Buffer, hel
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
- result, protocolBreak, _ = r.handleHeader(&header,
startPosition, http2Metrics, buf)
+ result, protocolBreak, _ = r.handleHeader(&header,
startPosition, http2Metrics, connection.Buffer())
case http2.FrameData:
- result, protocolBreak, _ = r.handleData(&header,
startPosition, http2Metrics, buf)
+ result, protocolBreak, _ = r.handleData(&header,
startPosition, http2Metrics, connection.Buffer())
default:
tmp := make([]byte, header.Length)
- if err := buf.ReadUntilBufferFull(tmp); err != nil {
+ if err := connection.Buffer().ReadUntilBufferFull(tmp);
err != nil {
if errors.Is(err, buffer.ErrNotComplete) {
result = enums.ParseResultSkipPackage
} else {
@@ -132,9 +139,9 @@ func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics,
buf *buffer.Buffer, hel
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = buf.RemoveReadElements()
+ finishReading = connection.Buffer().RemoveReadElements()
case enums.ParseResultSkipPackage:
- finishReading = buf.SkipCurrentElement()
+ finishReading = connection.Buffer().SkipCurrentElement()
}
if finishReading {
@@ -145,6 +152,10 @@ func (r *HTTP2Protocol) Analyze(metrics ProtocolMetrics,
buf *buffer.Buffer, hel
return nil
}
+func (r *HTTP2Protocol) ForProtocol() enums.ConnectionProtocol {
+ return enums.ConnectionProtocolHTTP2
+}
+
func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos
*buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool,
error) {
bytes := make([]byte, header.Length)
@@ -161,9 +172,9 @@ func (r *HTTP2Protocol) handleHeader(header
*http2.FrameHeader, startPos *buffer
headers := r.parseHeaders(headerData)
if streaming == nil {
streaming = &HTTP2Streaming{
- reqHeader: headers,
- respHeader: make(map[string]string),
- reqHeaderBuffer: buf.Slice(true, startPos,
buf.Position()),
+ ReqHeader: headers,
+ RespHeader: make(map[string]string),
+ ReqHeaderBuffer: buf.Slice(true, startPos,
buf.Position()),
}
metrics.streams[header.StreamID] = streaming
return enums.ParseResultSuccess, false, nil
@@ -171,28 +182,28 @@ func (r *HTTP2Protocol) handleHeader(header
*http2.FrameHeader, startPos *buffer
status, contains := headers[":status"]
if contains {
- streaming.isInResponse = true
+ streaming.IsInResponse = true
code, err := strconv.ParseInt(status, 10, 64)
if err != nil {
log.Warnf("cannot parse status code: %s", status)
code = 200
}
- streaming.status = int(code)
+ streaming.Status = int(code)
}
- if !streaming.isInResponse {
- r.appendHeaders(streaming.reqHeader, headers)
- streaming.reqHeaderBuffer = buffer.CombineSlices(true,
streaming.reqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+ if !streaming.IsInResponse {
+ r.AppendHeaders(streaming.ReqHeader, headers)
+ streaming.ReqHeaderBuffer = buffer.CombineSlices(true,
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
return enums.ParseResultSuccess, false, nil
}
- r.appendHeaders(streaming.respHeader, headers)
- streaming.respHeaderBuffer = buffer.CombineSlices(true,
streaming.respHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+ r.AppendHeaders(streaming.RespHeader, headers)
+ streaming.RespHeaderBuffer = buffer.CombineSlices(true,
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
// is end of stream and in the response
if header.Flags.Has(http2.FlagHeadersEndStream) {
// should be end of the stream and send to the protocol
- r.handleWholeStream(streaming)
+ r.analyze(streaming)
// delete streaming
delete(metrics.streams, header.StreamID)
}
@@ -201,91 +212,91 @@ func (r *HTTP2Protocol) handleHeader(header
*http2.FrameHeader, startPos *buffer
func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id
uint32, streaming *HTTP2Streaming) {
// if in the response mode or the request body is not nil, then skip
- if streaming.isInResponse || streaming.reqBodyBuffer == nil {
+ if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
return
}
// is the body sending too long, then split the stream
- socketBuffer := streaming.reqBodyBuffer.FirstSocketBuffer()
+ socketBuffer := streaming.ReqBodyBuffer.FirstSocketBuffer()
if socketBuffer == nil {
return
}
if time.Since(host.Time(socketBuffer.StartTime())) >
maxHTTP2StreamingTime {
http2Log.Infof("detect the HTTP/2 stream is too long, split the
stream, connection ID: %d, stream ID: %d, headers: %v",
- metrics.connectionID, id, streaming.reqHeader)
+ metrics.connectionID, id, streaming.ReqHeader)
- r.handleWholeStream(streaming)
+ r.analyze(streaming)
// clean sent buffers
- if streaming.reqBodyBuffer != nil {
- streaming.reqBodyBuffer.Clean()
+ if streaming.ReqBodyBuffer != nil {
+ streaming.ReqBodyBuffer.Clean()
}
}
}
func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
detailEvents := make([]events.SocketDetail, 0)
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.reqHeaderBuffer)
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.reqBodyBuffer)
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.respHeaderBuffer)
- detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.respBodyBuffer)
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.ReqHeaderBuffer)
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.ReqBodyBuffer)
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.RespHeaderBuffer)
+ detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.RespBodyBuffer)
if len(detailEvents) == 0 {
http2Log.Warnf("cannot found any detail events for HTTP/2
protocol, data id: %d-%d",
- stream.reqHeaderBuffer.FirstSocketBuffer().DataID(),
stream.respBodyBuffer.LastSocketBuffer().DataID())
+ stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(),
stream.RespBodyBuffer.LastSocketBuffer().DataID())
return
}
forwarder.SendTransferProtocolEvent(r.ctx, detailEvents,
&v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
- StartTime:
forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer,
detailEvents[0]).GetStartTime()),
+ StartTime:
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer,
detailEvents[0]).GetStartTime()),
EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version:
v3.AccessLogHTTPProtocolVersion_HTTP2,
Request: &v3.AccessLogHTTPProtocolRequest{
- Method:
r.parseHTTPMethod(stream),
- Path:
stream.reqHeader[":path"],
- SizeOfHeadersBytes:
r.bufferSizeOfZero(stream.reqHeaderBuffer),
- SizeOfBodyBytes:
r.bufferSizeOfZero(stream.reqBodyBuffer),
+ Method:
r.ParseHTTPMethod(stream),
+ Path:
stream.ReqHeader[":path"],
+ SizeOfHeadersBytes:
r.BufferSizeOfZero(stream.ReqHeaderBuffer),
+ SizeOfBodyBytes:
r.BufferSizeOfZero(stream.ReqBodyBuffer),
- Trace: analyzeTraceInfo(func(key
string) string {
- return stream.reqHeader[key]
+ Trace: AnalyzeTraceInfo(func(key
string) string {
+ return stream.ReqHeader[key]
}, http2Log),
},
Response: &v3.AccessLogHTTPProtocolResponse{
- StatusCode:
int32(stream.status),
- SizeOfHeadersBytes:
r.bufferSizeOfZero(stream.respHeaderBuffer),
- SizeOfBodyBytes:
r.bufferSizeOfZero(stream.respBodyBuffer),
+ StatusCode:
int32(stream.Status),
+ SizeOfHeadersBytes:
r.BufferSizeOfZero(stream.RespHeaderBuffer),
+ SizeOfBodyBytes:
r.BufferSizeOfZero(stream.RespBodyBuffer),
},
},
},
})
}
-func (r *HTTP2Protocol) parseHTTPMethod(streaming *HTTP2Streaming)
v3.AccessLogHTTPProtocolRequestMethod {
- method := streaming.reqHeader[":method"]
+func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming)
v3.AccessLogHTTPProtocolRequestMethod {
+ method := streaming.ReqHeader[":method"]
if method == "" {
return v3.AccessLogHTTPProtocolRequestMethod_Get
}
- return transformHTTPMethod(strings.ToUpper(method))
+ return TransformHTTPMethod(strings.ToUpper(method))
}
-func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def
events.SocketDetail) events.SocketDetail {
+func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def
events.SocketDetail) events.SocketDetail {
if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
return def
}
return buf.Details().Front().Value.(events.SocketDetail)
}
-func (r *HTTP2Protocol) bufferSizeOfZero(buf *buffer.Buffer) uint64 {
+func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 {
if buf == nil {
return 0
}
return uint64(buf.DataSize())
}
-func (r *HTTP2Protocol) appendHeaders(exist, needAppends map[string]string) {
+func (r *HTTP2Protocol) AppendHeaders(exist, needAppends map[string]string) {
for k, v := range needAppends {
exist[k] = v
}
@@ -302,10 +313,10 @@ func (r *HTTP2Protocol) handleData(header
*http2.FrameHeader, startPos *buffer.P
if err := buf.ReadUntilBufferFull(bytes); err != nil {
return enums.ParseResultSkipPackage, false, err
}
- if !streaming.isInResponse {
- streaming.reqBodyBuffer = buffer.CombineSlices(true,
streaming.reqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+ if !streaming.IsInResponse {
+ streaming.ReqBodyBuffer = buffer.CombineSlices(true,
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
} else {
- streaming.respBodyBuffer = buffer.CombineSlices(true,
streaming.respBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+ streaming.RespBodyBuffer = buffer.CombineSlices(true,
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
}
r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
diff --git a/pkg/accesslog/collector/protocols/protocol.go
b/pkg/accesslog/collector/protocols/protocol.go
index 17ff30c..41ac8a2 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -18,7 +18,6 @@
package protocols
import (
- "github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
@@ -28,18 +27,16 @@ import (
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
-var registeredProtocols = make(map[enums.ConnectionProtocol]func(ctx
*common.AccessLogContext) Protocol)
-
type ProtocolManager struct {
protocols map[enums.ConnectionProtocol]Protocol
}
-func NewProtocolManager(ctx *common.AccessLogContext) *ProtocolManager {
- protocols := make(map[enums.ConnectionProtocol]Protocol)
- for protocol, generator := range registeredProtocols {
- protocols[protocol] = generator(ctx)
+func NewProtocolManager(protocols []Protocol) *ProtocolManager {
+ m := make(map[enums.ConnectionProtocol]Protocol)
+ for _, protocol := range protocols {
+ m[protocol.ForProtocol()] = protocol
}
- return &ProtocolManager{protocols: protocols}
+ return &ProtocolManager{protocols: m}
}
func (a *ProtocolManager) GetProtocol(protocol enums.ConnectionProtocol)
Protocol {
@@ -54,11 +51,12 @@ type AnalyzeHelper struct {
}
type Protocol interface {
+ ForProtocol() enums.ConnectionProtocol
GenerateConnection(connectionID, randomID uint64) ProtocolMetrics
- Analyze(metrics ProtocolMetrics, buffer *buffer.Buffer, helper
*AnalyzeHelper) error
+ Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error
}
-func appendSocketDetailsFromBuffer(result []events.SocketDetail, buf
*buffer.Buffer) []events.SocketDetail {
+func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf
*buffer.Buffer) []events.SocketDetail {
if buf == nil || buf.DetailLength() == 0 {
return result
}
@@ -71,7 +69,7 @@ func appendSocketDetailsFromBuffer(result
[]events.SocketDetail, buf *buffer.Buf
return result
}
-func analyzeTraceInfo(fetcher func(key string) string, protocolLog
*logger.Logger) *v3.AccessLogTraceInfo {
+func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog
*logger.Logger) *v3.AccessLogTraceInfo {
context, err := tracing.AnalyzeTracingContext(func(key string) string {
return fetcher(key)
})
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index cc4cdfe..e505d3e 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -49,7 +49,8 @@ type AnalyzeQueue struct {
eventQueue *btf.EventQueue
perCPUBuffer int64
- detailSupplier func() events.SocketDetail
+ detailSupplier func() events.SocketDetail
+ supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
}
func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
@@ -70,16 +71,23 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
return &AnalyzeQueue{
context: ctx,
perCPUBuffer: perCPUBufferSize,
- eventQueue:
btf.NewEventQueue(ctx.Config.ProtocolAnalyze.Parallels,
ctx.Config.ProtocolAnalyze.QueueSize, func(num int) btf.PartitionContext {
- return NewPartitionContext(ctx, num)
- }),
detailSupplier: func() events.SocketDetail {
return &events.SocketDetailEvent{}
},
+ supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol
{
+ return []Protocol{
+ NewHTTP1Analyzer(ctx, nil),
+ NewHTTP2Analyzer(ctx, nil),
+ }
+ },
}, nil
}
func (q *AnalyzeQueue) Start(ctx context.Context) {
+ q.eventQueue =
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.Parallels,
q.context.Config.ProtocolAnalyze.QueueSize,
+ func(num int) btf.PartitionContext {
+ return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
+ })
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue,
int(q.perCPUBuffer), func() interface{} {
return q.detailSupplier()
}, func(data interface{}) string {
@@ -98,6 +106,10 @@ func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func()
events.SocketDetail)
q.detailSupplier = supplier
}
+func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx
*common.AccessLogContext) []Protocol) {
+ q.supportAnalyzers = protocols
+}
+
type PartitionContext struct {
context *common.AccessLogContext
protocolMgr *ProtocolManager
@@ -108,21 +120,31 @@ type PartitionContext struct {
}
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID
uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
- analyzer := protocolMgr.GetProtocol(protocol)
- return &PartitionConnection{
+ connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
dataBuffer: buffer.NewBuffer(),
- protocol: protocol,
- protocolAnalyzer: analyzer,
- protocolMetrics: analyzer.GenerateConnection(conID, randomID),
+ protocol: make(map[enums.ConnectionProtocol]bool),
+ protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
+ protocolMetrics:
make(map[enums.ConnectionProtocol]ProtocolMetrics),
}
+ connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol)
+ return connection
}
-func NewPartitionContext(ctx *common.AccessLogContext, num int)
*PartitionContext {
+func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr
*ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) {
+ if _, exist := p.protocol[protocol]; !exist {
+ analyzer := protocolMgr.GetProtocol(protocol)
+ p.protocol[protocol] = true
+ p.protocolAnalyzer[protocol] = analyzer
+ p.protocolMetrics[protocol] =
analyzer.GenerateConnection(conID, randomID)
+ }
+}
+
+func NewPartitionContext(ctx *common.AccessLogContext, num int, protocols
[]Protocol) *PartitionContext {
pc := &PartitionContext{
context: ctx,
- protocolMgr: NewProtocolManager(ctx),
+ protocolMgr: NewProtocolManager(protocols),
connections: cmap.New(),
partitionNum: num,
}
@@ -191,13 +213,13 @@ func (p *PartitionContext) Consume(data interface{}) {
return
}
connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol())
- connection.appendDetail(p.context, event)
+ connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d,
random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0,
event.Sequence0, event.Protocol)
connection := p.getConnectionContext(event.ConnectionID,
event.RandomID, event.Protocol)
- connection.appendData(event)
+ connection.AppendData(event)
}
}
@@ -205,7 +227,9 @@ func (p *PartitionContext)
getConnectionContext(connectionID, randomID uint64, p
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
if exist {
- return conn.(*PartitionConnection)
+ connection := conn.(*PartitionConnection)
+ connection.appendProtocolIfNeed(p.protocolMgr, connectionID,
randomID, protocol)
+ return connection
}
result := newPartitionConnection(p.protocolMgr, connectionID, randomID,
protocol)
p.connections.Set(conKey, result)
@@ -296,8 +320,10 @@ func (p *PartitionContext)
processConnectionEvents(connection *PartitionConnecti
return
}
helper := &AnalyzeHelper{}
- if err :=
connection.protocolAnalyzer.Analyze(connection.protocolMetrics,
connection.dataBuffer, helper); err != nil {
- log.Warnf("failed to analyze the %s protocol data: %v",
connection.protocol.String(), err)
+ for protocol, analyzer := range connection.protocolAnalyzer {
+ if err := analyzer.Analyze(connection, helper); err != nil {
+ log.Warnf("failed to analyze the %s protocol data: %v",
enums.ConnectionProtocolString(protocol), err)
+ }
}
if helper.ProtocolBreak {
@@ -307,31 +333,3 @@ func (p *PartitionContext)
processConnectionEvents(connection *PartitionConnecti
connection.dataBuffer.Clean()
}
}
-
-type PartitionConnection struct {
- connectionID, randomID uint64
- dataBuffer *buffer.Buffer
- protocol enums.ConnectionProtocol
- protocolAnalyzer Protocol
- protocolMetrics ProtocolMetrics
- closed bool
- closeCallback common.ConnectionProcessFinishCallback
- skipAllDataAnalyze bool
- lastCheckCloseTime time.Time
-}
-
-func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext,
detail events.SocketDetail) {
- if p.skipAllDataAnalyze {
- // if the connection is already skip all data analyze, then
just send the detail event
- forwarder.SendTransferNoProtocolEvent(ctx, detail)
- return
- }
- p.dataBuffer.AppendDetailEvent(detail)
-}
-
-func (p *PartitionConnection) appendData(data buffer.SocketDataBuffer) {
- if p.skipAllDataAnalyze {
- return
- }
- p.dataBuffer.AppendDataEvent(data)
-}
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go
b/pkg/profiling/task/network/analyze/base/metrics.go
index 60ec728..c856d55 100644
--- a/pkg/profiling/task/network/analyze/base/metrics.go
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -111,7 +111,7 @@ func (m *MetricsBuilder) BuildBasicMeterLabels(traffic
*ProcessTraffic, local ap
labels = m.appendMeterValue(labels, "side", curRole.String())
// protocol and ssl
- labels = m.appendMeterValue(labels, "protocol",
traffic.Protocol.String())
+ labels = m.appendMeterValue(labels, "protocol",
enums.ConnectionProtocolString(traffic.Protocol))
labels = m.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t",
traffic.IsSSL))
return curRole, labels
}
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go
b/pkg/profiling/task/network/analyze/layer4/listener.go
index 0c16145..e9e4caa 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -111,7 +111,7 @@ func (l *Listener) PreFlushConnectionMetrics(ccs
[]*base.ConnectionWithBPF, bpfL
log.Debugf("found connection: %d, %s relation:
%s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d
bytes/%d, read: %d bytes/%d",
connection.ConnectionID,
connection.Role.String(),
connection.LocalIP, connection.LocalPort,
connection.LocalPid, connection.RemoteIP, connection.RemotePort,
- connection.Protocol.String(), connection.IsSSL,
connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
+
enums.ConnectionProtocolString(connection.Protocol), connection.IsSSL,
connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
layer4.WriteCounter.Cur.Count,
layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
}
}
@@ -218,7 +218,8 @@ func (l *Listener) logTheMetricsConnections(traffics
[]*base.ProcessTraffic) {
side := traffic.Role.String()
layer4 := l.getMetrics(traffic.Metrics)
log.Debugf("connection layer4 analyze result: %s : %s,
protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
- side, traffic.GenerateConnectionInfo(),
traffic.Protocol.String(), traffic.IsSSL, layer4.WriteCounter.Cur.Bytes,
layer4.WriteCounter.Cur.Count,
+ side, traffic.GenerateConnectionInfo(),
enums.ConnectionProtocolString(traffic.Protocol),
+ traffic.IsSSL, layer4.WriteCounter.Cur.Bytes,
layer4.WriteCounter.Cur.Count,
layer4.ReadCounter.Cur.Bytes,
layer4.ReadCounter.Cur.Count)
}
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index ac12d93..52b181d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -199,7 +199,7 @@ func (a *ProtocolAnalyzer)
processConnectionEvents(connection *connectionInfo) {
func (a *ProtocolAnalyzer) processConnectionExpireEvents(connection
*connectionInfo, expireDuration time.Duration) {
if c := connection.buffer.DeleteExpireEvents(expireDuration); c > 0 {
- log.Debugf("total removed %d expired events for %s protocol",
c, a.protocol.Protocol().String())
+ log.Debugf("total removed %d expired events for %s protocol",
c, enums.ConnectionProtocolString(a.protocol.Protocol()))
}
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 28dd6d4..4c73cf0 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -204,7 +204,7 @@ func (h *Trace) Flush(duration int64, process
api.ProcessInterface, traffic *bas
Latency: duration,
TraceProvider: h.Trace.Provider().Name,
DetectPoint: traffic.Role.String(),
- Component: traffic.Protocol.String(),
+ Component: enums.ConnectionProtocolString(traffic.Protocol),
SSL: traffic.IsSSL,
URI: h.RequestURI,
Reason: h.Type,
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index f26826b..60bcd71 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -74,7 +74,7 @@ func (a *Analyzer) ReceiveSocketDataEvent(event
*events.SocketDataUploadEvent) {
analyzer := a.protocols[event.Protocol]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket data,
connection id: %s, protocol: %s(%d)",
- event.GenerateConnectionID(), event.Protocol.String(),
event.Protocol)
+ event.GenerateConnectionID(),
enums.ConnectionProtocolString(event.Protocol), event.Protocol)
return
}
analyzer.ReceiveSocketData(a.ctx, event)
@@ -84,7 +84,7 @@ func (a *Analyzer) ReceiveSocketDetail(event
*events.SocketDetailEvent) {
analyzer := a.protocols[event.Protocol]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket
detail, connection id: %s, protocol: %s(%d)",
- event.GenerateConnectionID(), event.Protocol.String(),
event.Protocol)
+ event.GenerateConnectionID(),
enums.ConnectionProtocolString(event.Protocol), event.Protocol)
return
}
analyzer.ReceiveSocketDetail(a.ctx, event)
diff --git a/pkg/tools/enums/protocol.go b/pkg/tools/enums/protocol.go
index 555705c..be10bbd 100644
--- a/pkg/tools/enums/protocol.go
+++ b/pkg/tools/enums/protocol.go
@@ -23,3 +23,29 @@ const (
ParseResultSuccess ParseResult = iota
ParseResultSkipPackage
)
+
+type ConnectionProtocol uint8
+
+const (
+ ConnectionProtocolUnknown ConnectionProtocol = 0
+ ConnectionProtocolHTTP ConnectionProtocol = 1
+ ConnectionProtocolHTTP2 ConnectionProtocol = 2
+)
+
+var connectionProtocolMap = make(map[ConnectionProtocol]string)
+
+func init() {
+ RegisterConnectionProtocolString(ConnectionProtocolHTTP, http)
+ RegisterConnectionProtocolString(ConnectionProtocolHTTP2, http)
+}
+
+func RegisterConnectionProtocolString(protocol ConnectionProtocol, name
string) {
+ connectionProtocolMap[protocol] = name
+}
+
+func ConnectionProtocolString(protocol ConnectionProtocol) string {
+ if name, ok := connectionProtocolMap[protocol]; ok {
+ return name
+ }
+ return unknown
+}
diff --git a/pkg/tools/enums/socket.go b/pkg/tools/enums/socket.go
index 20caa57..8c4ee4d 100644
--- a/pkg/tools/enums/socket.go
+++ b/pkg/tools/enums/socket.go
@@ -92,27 +92,6 @@ const (
SocketExceptionOperationDrop SocketExceptionOperationType = 2
)
-type ConnectionProtocol uint8
-
-const (
- ConnectionProtocolUnknown ConnectionProtocol = 0
- ConnectionProtocolHTTP ConnectionProtocol = 1
- ConnectionProtocolHTTP2 ConnectionProtocol = 2
-)
-
-func (c ConnectionProtocol) String() string {
- switch c {
- case ConnectionProtocolUnknown:
- return unknown
- case ConnectionProtocolHTTP:
- return http
- case ConnectionProtocolHTTP2:
- return http
- default:
- return unknown
- }
-}
-
type SocketMessageType uint8
const (