This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new ecb6e8c Refactor kernel and protocol log when sending to backend
(#183)
ecb6e8c is described below
commit ecb6e8c5cab330a45c60fc93da8da2126b38c2b1
Author: mrproliu <[email protected]>
AuthorDate: Thu Mar 6 11:23:16 2025 +0800
Refactor kernel and protocol log when sending to backend (#183)
---
pkg/accesslog/collector/protocols/http1.go | 4 +-
pkg/accesslog/collector/protocols/http2.go | 4 +-
pkg/accesslog/common/connection.go | 2 +-
.../{forwarder/close.go => common/logs.go} | 53 ++++++++++++++--------
pkg/accesslog/common/queue.go | 36 ++++++---------
pkg/accesslog/forwarder/close.go | 2 +-
pkg/accesslog/forwarder/connect.go | 4 +-
pkg/accesslog/forwarder/protocol.go | 7 +--
pkg/accesslog/forwarder/transfer.go | 2 +-
pkg/accesslog/runner.go | 39 ++++++++--------
10 files changed, 80 insertions(+), 73 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 695e9c4..6af2965 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -242,7 +242,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics
*HTTP1Metrics, connection *Partit
if host == "" && originalRequest.URL != nil {
host = originalRequest.URL.Host
}
- forwarder.SendTransferProtocolEvent(p.ctx, details,
&v3.AccessLogProtocolLogs{
+ forwarder.SendTransferProtocolEvent(p.ctx,
common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime:
forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
@@ -265,7 +265,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics
*HTTP1Metrics, connection *Partit
},
},
},
- })
+ }))
return nil
}
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index 0ccdf00..1d701ce 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -263,7 +263,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_
*PartitionConnection, stream *HTTP2S
if streamHost == "" {
streamHost = stream.ReqHeader[":host"]
}
- forwarder.SendTransferProtocolEvent(r.ctx, details,
&v3.AccessLogProtocolLogs{
+ forwarder.SendTransferProtocolEvent(r.ctx,
common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime:
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer,
details[0]).GetStartTime()),
@@ -286,7 +286,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_
*PartitionConnection, stream *HTTP2S
},
},
},
- })
+ }))
return nil
}
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index fddc2f3..d4c8ef2 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -194,7 +194,7 @@ func (c *ConnectionManager) Start(ctx context.Context,
accessLogContext *AccessL
SocketFD:
activateConn.SocketFD,
Success: 0,
})
-
accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent)
+
accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose,
wapperedEvent))
}
case <-ctx.Done():
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/common/logs.go
similarity index 52%
copy from pkg/accesslog/forwarder/close.go
copy to pkg/accesslog/common/logs.go
index d949588..3355941 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/common/logs.go
@@ -15,35 +15,50 @@
// specific language governing permissions and limitations
// under the License.
-package forwarder
+package common
import (
- "github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
-func init() {
- RegisterKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
+type kernelLogEvent struct {
+ logType LogType
+ event events.Event
}
-func SendCloseEvent(context *common.AccessLogContext, event
*common.CloseEventWithNotify) {
- context.Queue.AppendKernelLog(common.LogTypeClose, event)
+func NewKernelLogEvent(logType LogType, event events.Event) KernelLog {
+ return &kernelLogEvent{
+ logType: logType,
+ event: event,
+ }
}
-func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
- closeEvent := event.(*common.CloseEventWithNotify)
- if closeEvent.StartTime == 0 {
- return nil
- }
- closeOp := &v3.AccessLogKernelCloseOperation{}
- closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime)
- closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime)
- closeOp.Success = closeEvent.Success == 1
- return &v3.AccessLogKernelLog{
- Operation: &v3.AccessLogKernelLog_Close{
- Close: closeOp,
- },
+func (k *kernelLogEvent) Type() LogType {
+ return k.logType
+}
+
+func (k *kernelLogEvent) Event() events.Event {
+ return k.event
+}
+
+type ProtocolEventData struct {
+ KernelLogs []events.SocketDetail
+ ProtocolLogData *v3.AccessLogProtocolLogs
+}
+
+func (r *ProtocolEventData) RelateKernelLogs() []events.SocketDetail {
+ return r.KernelLogs
+}
+
+func (r *ProtocolEventData) ProtocolLog() *v3.AccessLogProtocolLogs {
+ return r.ProtocolLogData
+}
+
+func NewProtocolLogEvent(kernelLogs []events.SocketDetail, protocolData
*v3.AccessLogProtocolLogs) ProtocolLog {
+ return &ProtocolEventData{
+ KernelLogs: kernelLogs,
+ ProtocolLogData: protocolData,
}
}
diff --git a/pkg/accesslog/common/queue.go b/pkg/accesslog/common/queue.go
index 131a29a..b4a9cd0 100644
--- a/pkg/accesslog/common/queue.go
+++ b/pkg/accesslog/common/queue.go
@@ -33,19 +33,19 @@ import (
var log = logger.GetLogger("access_log", "common")
-type KernelLog struct {
- Type LogType
- Event events.Event
+type KernelLog interface {
+ Type() LogType
+ Event() events.Event
}
-type ProtocolLog struct {
- KernelLogs []events.SocketDetail
- Protocol *v3.AccessLogProtocolLogs
+type ProtocolLog interface {
+ RelateKernelLogs() []events.SocketDetail
+ ProtocolLog() *v3.AccessLogProtocolLogs
}
type Queue struct {
- kernelLogs chan *KernelLog
- protocolLogs chan *ProtocolLog
+ kernelLogs chan KernelLog
+ protocolLogs chan ProtocolLog
maxFlushCount int
period time.Duration
@@ -57,13 +57,13 @@ type Queue struct {
}
type QueueConsumer interface {
- Consume(kernels chan *KernelLog, protocols chan *ProtocolLog)
+ Consume(kernels chan KernelLog, protocols chan ProtocolLog)
}
func NewQueue(maxFlushCount int, period time.Duration, consumer QueueConsumer)
*Queue {
return &Queue{
- kernelLogs: make(chan *KernelLog, maxFlushCount*3),
- protocolLogs: make(chan *ProtocolLog, maxFlushCount*3),
+ kernelLogs: make(chan KernelLog, maxFlushCount*3),
+ protocolLogs: make(chan ProtocolLog, maxFlushCount*3),
maxFlushCount: maxFlushCount,
period: period,
consumer: consumer,
@@ -71,12 +71,9 @@ func NewQueue(maxFlushCount int, period time.Duration,
consumer QueueConsumer) *
}
}
-func (q *Queue) AppendKernelLog(tp LogType, event events.Event) {
+func (q *Queue) AppendKernelLog(log KernelLog) {
select {
- case q.kernelLogs <- &KernelLog{
- Type: tp,
- Event: event,
- }:
+ case q.kernelLogs <- log:
default:
atomic.AddInt64(&q.dropKernelLogCount, 1)
return
@@ -84,12 +81,9 @@ func (q *Queue) AppendKernelLog(tp LogType, event
events.Event) {
q.consumeIfNeed()
}
-func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol
*v3.AccessLogProtocolLogs) {
+func (q *Queue) AppendProtocolLog(log ProtocolLog) {
select {
- case q.protocolLogs <- &ProtocolLog{
- KernelLogs: kernelLogs,
- Protocol: protocol,
- }:
+ case q.protocolLogs <- log:
default:
atomic.AddInt64(&q.dropProtocolLogCount, 1)
return
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go
index d949588..6d27ffa 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/forwarder/close.go
@@ -29,7 +29,7 @@ func init() {
}
func SendCloseEvent(context *common.AccessLogContext, event
*common.CloseEventWithNotify) {
- context.Queue.AppendKernelLog(common.LogTypeClose, event)
+
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeClose,
event))
}
func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
diff --git a/pkg/accesslog/forwarder/connect.go
b/pkg/accesslog/forwarder/connect.go
index 2058d25..ba3601e 100644
--- a/pkg/accesslog/forwarder/connect.go
+++ b/pkg/accesslog/forwarder/connect.go
@@ -31,10 +31,10 @@ func init() {
}
func SendConnectEvent(context *common.AccessLogContext, event
*events.SocketConnectEvent, socketPair *ip.SocketPair) {
- context.Queue.AppendKernelLog(common.LogTypeConnect,
&common.ConnectEventWithSocket{
+
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeConnect,
&common.ConnectEventWithSocket{
SocketConnectEvent: event,
SocketPair: socketPair,
- })
+ }))
}
func connectLogBuilder(event events.Event) *v3.AccessLogKernelLog {
diff --git a/pkg/accesslog/forwarder/protocol.go
b/pkg/accesslog/forwarder/protocol.go
index 8e8c5b4..ac60bed 100644
--- a/pkg/accesslog/forwarder/protocol.go
+++ b/pkg/accesslog/forwarder/protocol.go
@@ -19,11 +19,8 @@ package forwarder
import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
- "github.com/apache/skywalking-rover/pkg/accesslog/events"
-
- v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
-func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs
[]events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) {
- context.Queue.AppendProtocolLog(kernelLogs, protocolData)
+func SendTransferProtocolEvent(context *common.AccessLogContext, event
common.ProtocolLog) {
+ context.Queue.AppendProtocolLog(event)
}
diff --git a/pkg/accesslog/forwarder/transfer.go
b/pkg/accesslog/forwarder/transfer.go
index 44c0d4e..22d7fb1 100644
--- a/pkg/accesslog/forwarder/transfer.go
+++ b/pkg/accesslog/forwarder/transfer.go
@@ -30,7 +30,7 @@ func init() {
}
func SendTransferNoProtocolEvent(context *common.AccessLogContext, event
events.SocketDetail) {
- context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event)
+
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeKernelTransfer,
event))
}
func kernelTransferLogBuilder(event events.Event) *v3.AccessLogKernelLog {
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index d483172..5dce05d 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -105,7 +105,7 @@ func (r *Runner) Start(ctx context.Context) error {
return nil
}
-func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan
*common.ProtocolLog) {
+func (r *Runner) Consume(kernels chan common.KernelLog, protocols chan
common.ProtocolLog) {
if r.backendOp.GetConnectionStatus() != backend.Connected {
log.Warnf("failure to connect to the backend, skip generating
access log")
return
@@ -117,21 +117,21 @@ func (r *Runner) Consume(kernels chan *common.KernelLog,
protocols chan *common.
r.sender.AddBatch(batch)
}
-func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan
*common.KernelLog, protocols chan *common.ProtocolLog) {
+func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan
common.KernelLog, protocols chan common.ProtocolLog) {
r.buildKernelLogs(kernels, batch)
r.buildProtocolLogs(protocols, batch)
r.context.ConnectionMgr.OnBuildConnectionLogFinished()
}
-func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch
*sender.BatchLogs) {
- delayAppends := make([]*common.KernelLog, 0)
+func (r *Runner) buildKernelLogs(kernels chan common.KernelLog, batch
*sender.BatchLogs) {
+ delayAppends := make([]common.KernelLog, 0)
for {
select {
case kernelLog := <-kernels:
connection, curLog, delay := r.buildKernelLog(kernelLog)
log.Debugf("building kernel log result, connetaion ID:
%d, random ID: %d, exist connection: %t, delay: %t",
- kernelLog.Event.GetConnectionID(),
kernelLog.Event.GetRandomID(), connection != nil, delay)
+ kernelLog.Event().GetConnectionID(),
kernelLog.Event().GetRandomID(), connection != nil, delay)
if connection != nil && curLog != nil {
batch.AppendKernelLog(connection, curLog)
} else if delay {
@@ -150,17 +150,18 @@ func (r *Runner) buildKernelLogs(kernels chan
*common.KernelLog, batch *sender.B
}
}
-func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch
*sender.BatchLogs) {
- delayAppends := make([]*common.ProtocolLog, 0)
+func (r *Runner) buildProtocolLogs(protocols chan common.ProtocolLog, batch
*sender.BatchLogs) {
+ delayAppends := make([]common.ProtocolLog, 0)
for {
select {
case protocolLog := <-protocols:
connection, kernelLogs, protocolLogs, delay :=
r.buildProtocolLog(protocolLog)
if log.Enable(logrus.DebugLevel) {
- kernelLogCount := len(protocolLog.KernelLogs)
+ kernelLogCount :=
len(protocolLog.RelateKernelLogs())
var conID, randomID uint64
if kernelLogCount > 0 {
- conID, randomID =
protocolLog.KernelLogs[0].GetConnectionID(),
protocolLog.KernelLogs[0].GetRandomID()
+ conID, randomID =
protocolLog.RelateKernelLogs()[0].GetConnectionID(),
+
protocolLog.RelateKernelLogs()[0].GetRandomID()
}
log.Debugf("building protocol log result,
connetaion ID: %d, random ID: %d, connection exist: %t, delay: %t",
conID, randomID, connection != nil,
delay)
@@ -200,12 +201,12 @@ func (r *Runner) shouldReportProcessLog(pid uint32) bool {
return true
}
-func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog)
(*common.ConnectionInfo,
+func (r *Runner) buildProtocolLog(protocolLog common.ProtocolLog)
(*common.ConnectionInfo,
[]*v3.AccessLogKernelLog, *v3.AccessLogProtocolLogs, bool) {
- if len(protocolLog.KernelLogs) == 0 {
+ if len(protocolLog.RelateKernelLogs()) == 0 {
return nil, nil, nil, false
}
- firstKernelLog := protocolLog.KernelLogs[0]
+ firstKernelLog := protocolLog.RelateKernelLogs()[0]
pid, _ := events.ParseConnectionID(firstKernelLog.GetConnectionID())
// if the process not monitoring, then ignore it
if !r.shouldReportProcessLog(pid) {
@@ -221,7 +222,7 @@ func (r *Runner) buildProtocolLog(protocolLog
*common.ProtocolLog) (*common.Conn
return nil, nil, nil, true
}
kernelLogs := make([]*v3.AccessLogKernelLog, 0)
- for _, kl := range protocolLog.KernelLogs {
+ for _, kl := range protocolLog.RelateKernelLogs() {
event :=
forwarder.BuildKernelLogFromEvent(common.LogTypeKernelTransfer, kl)
if event == nil {
continue
@@ -229,25 +230,25 @@ func (r *Runner) buildProtocolLog(protocolLog
*common.ProtocolLog) (*common.Conn
kernelLogs = append(kernelLogs, event)
}
- return connection, kernelLogs, protocolLog.Protocol, false
+ return connection, kernelLogs, protocolLog.ProtocolLog(), false
}
-func (r *Runner) buildKernelLog(kernelLog *common.KernelLog)
(*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) {
- pid, _ := events.ParseConnectionID(kernelLog.Event.GetConnectionID())
+func (r *Runner) buildKernelLog(kernelLog common.KernelLog)
(*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) {
+ pid, _ := events.ParseConnectionID(kernelLog.Event().GetConnectionID())
// if the process not monitoring, then ignore it
if !r.shouldReportProcessLog(pid) {
return nil, nil, false
}
- connection := r.context.ConnectionMgr.Find(kernelLog.Event)
+ connection := r.context.ConnectionMgr.Find(kernelLog.Event())
if connection == nil {
// if the connection cannot be found, it means that the
connection have not been established
// just re-add into the queue for checking in the next period
- if time.Since(kernelLog.Event.Timestamp()) >
kernelAccessLogCacheTime {
+ if time.Since(kernelLog.Event().Timestamp()) >
kernelAccessLogCacheTime {
return nil, nil, false
}
return nil, nil, true
}
- event := forwarder.BuildKernelLogFromEvent(kernelLog.Type,
kernelLog.Event)
+ event := forwarder.BuildKernelLogFromEvent(kernelLog.Type(),
kernelLog.Event())
return connection, event, false
}