This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch log-wrapper
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit 5198c2c6e55e7dd9d32dcaa1c7bb2d8d9458e61c
Author: mrproliu <[email protected]>
AuthorDate: Thu Mar 6 10:22:49 2025 +0800

    Refactor kernel and protocol log when sending to backend
---
 pkg/accesslog/collector/protocols/http1.go         |  4 +-
 pkg/accesslog/collector/protocols/http2.go         |  4 +-
 pkg/accesslog/common/connection.go                 |  2 +-
 .../{forwarder/close.go => common/logs.go}         | 53 ++++++++++++++--------
 pkg/accesslog/common/queue.go                      | 36 ++++++---------
 pkg/accesslog/forwarder/close.go                   |  2 +-
 pkg/accesslog/forwarder/connect.go                 |  4 +-
 pkg/accesslog/forwarder/protocol.go                |  7 +--
 pkg/accesslog/forwarder/transfer.go                |  2 +-
 pkg/accesslog/runner.go                            | 39 ++++++++--------
 10 files changed, 80 insertions(+), 73 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 695e9c4..6af2965 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -242,7 +242,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics 
*HTTP1Metrics, connection *Partit
        if host == "" && originalRequest.URL != nil {
                host = originalRequest.URL.Host
        }
-       forwarder.SendTransferProtocolEvent(p.ctx, details, 
&v3.AccessLogProtocolLogs{
+       forwarder.SendTransferProtocolEvent(p.ctx, 
common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
                                StartTime: 
forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
@@ -265,7 +265,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics 
*HTTP1Metrics, connection *Partit
                                },
                        },
                },
-       })
+       }))
        return nil
 }
 
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index 0ccdf00..1d701ce 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -263,7 +263,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_ 
*PartitionConnection, stream *HTTP2S
        if streamHost == "" {
                streamHost = stream.ReqHeader[":host"]
        }
-       forwarder.SendTransferProtocolEvent(r.ctx, details, 
&v3.AccessLogProtocolLogs{
+       forwarder.SendTransferProtocolEvent(r.ctx, 
common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
                                StartTime: 
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, 
details[0]).GetStartTime()),
@@ -286,7 +286,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_ 
*PartitionConnection, stream *HTTP2S
                                },
                        },
                },
-       })
+       }))
        return nil
 }
 
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index fddc2f3..d4c8ef2 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -194,7 +194,7 @@ func (c *ConnectionManager) Start(ctx context.Context, 
accessLogContext *AccessL
                                                SocketFD:     
activateConn.SocketFD,
                                                Success:      0,
                                        })
-                                       
accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent)
+                                       
accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose, 
wapperedEvent))
                                }
 
                        case <-ctx.Done():
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/common/logs.go
similarity index 52%
copy from pkg/accesslog/forwarder/close.go
copy to pkg/accesslog/common/logs.go
index d949588..3355941 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/common/logs.go
@@ -15,35 +15,50 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package forwarder
+package common
 
 import (
-       "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/accesslog/events"
 
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
-func init() {
-       RegisterKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
+type kernelLogEvent struct {
+       logType LogType
+       event   events.Event
 }
 
-func SendCloseEvent(context *common.AccessLogContext, event 
*common.CloseEventWithNotify) {
-       context.Queue.AppendKernelLog(common.LogTypeClose, event)
+func NewKernelLogEvent(logType LogType, event events.Event) KernelLog {
+       return &kernelLogEvent{
+               logType: logType,
+               event:   event,
+       }
 }
 
-func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
-       closeEvent := event.(*common.CloseEventWithNotify)
-       if closeEvent.StartTime == 0 {
-               return nil
-       }
-       closeOp := &v3.AccessLogKernelCloseOperation{}
-       closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime)
-       closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime)
-       closeOp.Success = closeEvent.Success == 1
-       return &v3.AccessLogKernelLog{
-               Operation: &v3.AccessLogKernelLog_Close{
-                       Close: closeOp,
-               },
+func (k *kernelLogEvent) Type() LogType {
+       return k.logType
+}
+
+func (k *kernelLogEvent) Event() events.Event {
+       return k.event
+}
+
+type ProtocolEventData struct {
+       KernelLogs      []events.SocketDetail
+       ProtocolLogData *v3.AccessLogProtocolLogs
+}
+
+func (r *ProtocolEventData) RelateKernelLogs() []events.SocketDetail {
+       return r.KernelLogs
+}
+
+func (r *ProtocolEventData) ProtocolLog() *v3.AccessLogProtocolLogs {
+       return r.ProtocolLogData
+}
+
+func NewProtocolLogEvent(kernelLogs []events.SocketDetail, protocolData 
*v3.AccessLogProtocolLogs) ProtocolLog {
+       return &ProtocolEventData{
+               KernelLogs:      kernelLogs,
+               ProtocolLogData: protocolData,
        }
 }
diff --git a/pkg/accesslog/common/queue.go b/pkg/accesslog/common/queue.go
index 131a29a..b4a9cd0 100644
--- a/pkg/accesslog/common/queue.go
+++ b/pkg/accesslog/common/queue.go
@@ -33,19 +33,19 @@ import (
 
 var log = logger.GetLogger("access_log", "common")
 
-type KernelLog struct {
-       Type  LogType
-       Event events.Event
+type KernelLog interface {
+       Type() LogType
+       Event() events.Event
 }
 
-type ProtocolLog struct {
-       KernelLogs []events.SocketDetail
-       Protocol   *v3.AccessLogProtocolLogs
+type ProtocolLog interface {
+       RelateKernelLogs() []events.SocketDetail
+       ProtocolLog() *v3.AccessLogProtocolLogs
 }
 
 type Queue struct {
-       kernelLogs   chan *KernelLog
-       protocolLogs chan *ProtocolLog
+       kernelLogs   chan KernelLog
+       protocolLogs chan ProtocolLog
 
        maxFlushCount int
        period        time.Duration
@@ -57,13 +57,13 @@ type Queue struct {
 }
 
 type QueueConsumer interface {
-       Consume(kernels chan *KernelLog, protocols chan *ProtocolLog)
+       Consume(kernels chan KernelLog, protocols chan ProtocolLog)
 }
 
 func NewQueue(maxFlushCount int, period time.Duration, consumer QueueConsumer) 
*Queue {
        return &Queue{
-               kernelLogs:    make(chan *KernelLog, maxFlushCount*3),
-               protocolLogs:  make(chan *ProtocolLog, maxFlushCount*3),
+               kernelLogs:    make(chan KernelLog, maxFlushCount*3),
+               protocolLogs:  make(chan ProtocolLog, maxFlushCount*3),
                maxFlushCount: maxFlushCount,
                period:        period,
                consumer:      consumer,
@@ -71,12 +71,9 @@ func NewQueue(maxFlushCount int, period time.Duration, 
consumer QueueConsumer) *
        }
 }
 
-func (q *Queue) AppendKernelLog(tp LogType, event events.Event) {
+func (q *Queue) AppendKernelLog(log KernelLog) {
        select {
-       case q.kernelLogs <- &KernelLog{
-               Type:  tp,
-               Event: event,
-       }:
+       case q.kernelLogs <- log:
        default:
                atomic.AddInt64(&q.dropKernelLogCount, 1)
                return
@@ -84,12 +81,9 @@ func (q *Queue) AppendKernelLog(tp LogType, event 
events.Event) {
        q.consumeIfNeed()
 }
 
-func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol 
*v3.AccessLogProtocolLogs) {
+func (q *Queue) AppendProtocolLog(log ProtocolLog) {
        select {
-       case q.protocolLogs <- &ProtocolLog{
-               KernelLogs: kernelLogs,
-               Protocol:   protocol,
-       }:
+       case q.protocolLogs <- log:
        default:
                atomic.AddInt64(&q.dropProtocolLogCount, 1)
                return
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go
index d949588..6d27ffa 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/forwarder/close.go
@@ -29,7 +29,7 @@ func init() {
 }
 
 func SendCloseEvent(context *common.AccessLogContext, event 
*common.CloseEventWithNotify) {
-       context.Queue.AppendKernelLog(common.LogTypeClose, event)
+       
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeClose, 
event))
 }
 
 func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
diff --git a/pkg/accesslog/forwarder/connect.go 
b/pkg/accesslog/forwarder/connect.go
index 2058d25..ba3601e 100644
--- a/pkg/accesslog/forwarder/connect.go
+++ b/pkg/accesslog/forwarder/connect.go
@@ -31,10 +31,10 @@ func init() {
 }
 
 func SendConnectEvent(context *common.AccessLogContext, event 
*events.SocketConnectEvent, socketPair *ip.SocketPair) {
-       context.Queue.AppendKernelLog(common.LogTypeConnect, 
&common.ConnectEventWithSocket{
+       
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeConnect, 
&common.ConnectEventWithSocket{
                SocketConnectEvent: event,
                SocketPair:         socketPair,
-       })
+       }))
 }
 
 func connectLogBuilder(event events.Event) *v3.AccessLogKernelLog {
diff --git a/pkg/accesslog/forwarder/protocol.go 
b/pkg/accesslog/forwarder/protocol.go
index 8e8c5b4..ac60bed 100644
--- a/pkg/accesslog/forwarder/protocol.go
+++ b/pkg/accesslog/forwarder/protocol.go
@@ -19,11 +19,8 @@ package forwarder
 
 import (
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
-       "github.com/apache/skywalking-rover/pkg/accesslog/events"
-
-       v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
-func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs 
[]events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) {
-       context.Queue.AppendProtocolLog(kernelLogs, protocolData)
+func SendTransferProtocolEvent(context *common.AccessLogContext, event 
common.ProtocolLog) {
+       context.Queue.AppendProtocolLog(event)
 }
diff --git a/pkg/accesslog/forwarder/transfer.go 
b/pkg/accesslog/forwarder/transfer.go
index 44c0d4e..22d7fb1 100644
--- a/pkg/accesslog/forwarder/transfer.go
+++ b/pkg/accesslog/forwarder/transfer.go
@@ -30,7 +30,7 @@ func init() {
 }
 
 func SendTransferNoProtocolEvent(context *common.AccessLogContext, event 
events.SocketDetail) {
-       context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event)
+       
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeKernelTransfer,
 event))
 }
 
 func kernelTransferLogBuilder(event events.Event) *v3.AccessLogKernelLog {
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index d483172..5dce05d 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -105,7 +105,7 @@ func (r *Runner) Start(ctx context.Context) error {
        return nil
 }
 
-func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan 
*common.ProtocolLog) {
+func (r *Runner) Consume(kernels chan common.KernelLog, protocols chan 
common.ProtocolLog) {
        if r.backendOp.GetConnectionStatus() != backend.Connected {
                log.Warnf("failure to connect to the backend, skip generating 
access log")
                return
@@ -117,21 +117,21 @@ func (r *Runner) Consume(kernels chan *common.KernelLog, 
protocols chan *common.
        r.sender.AddBatch(batch)
 }
 
-func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan 
*common.KernelLog, protocols chan *common.ProtocolLog) {
+func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan 
common.KernelLog, protocols chan common.ProtocolLog) {
        r.buildKernelLogs(kernels, batch)
        r.buildProtocolLogs(protocols, batch)
 
        r.context.ConnectionMgr.OnBuildConnectionLogFinished()
 }
 
-func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch 
*sender.BatchLogs) {
-       delayAppends := make([]*common.KernelLog, 0)
+func (r *Runner) buildKernelLogs(kernels chan common.KernelLog, batch 
*sender.BatchLogs) {
+       delayAppends := make([]common.KernelLog, 0)
        for {
                select {
                case kernelLog := <-kernels:
                        connection, curLog, delay := r.buildKernelLog(kernelLog)
                        log.Debugf("building kernel log result, connetaion ID: 
%d, random ID: %d, exist connection: %t, delay: %t",
-                               kernelLog.Event.GetConnectionID(), 
kernelLog.Event.GetRandomID(), connection != nil, delay)
+                               kernelLog.Event().GetConnectionID(), 
kernelLog.Event().GetRandomID(), connection != nil, delay)
                        if connection != nil && curLog != nil {
                                batch.AppendKernelLog(connection, curLog)
                        } else if delay {
@@ -150,17 +150,18 @@ func (r *Runner) buildKernelLogs(kernels chan 
*common.KernelLog, batch *sender.B
        }
 }
 
-func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch 
*sender.BatchLogs) {
-       delayAppends := make([]*common.ProtocolLog, 0)
+func (r *Runner) buildProtocolLogs(protocols chan common.ProtocolLog, batch 
*sender.BatchLogs) {
+       delayAppends := make([]common.ProtocolLog, 0)
        for {
                select {
                case protocolLog := <-protocols:
                        connection, kernelLogs, protocolLogs, delay := 
r.buildProtocolLog(protocolLog)
                        if log.Enable(logrus.DebugLevel) {
-                               kernelLogCount := len(protocolLog.KernelLogs)
+                               kernelLogCount := 
len(protocolLog.RelateKernelLogs())
                                var conID, randomID uint64
                                if kernelLogCount > 0 {
-                                       conID, randomID = 
protocolLog.KernelLogs[0].GetConnectionID(), 
protocolLog.KernelLogs[0].GetRandomID()
+                                       conID, randomID = 
protocolLog.RelateKernelLogs()[0].GetConnectionID(),
+                                               
protocolLog.RelateKernelLogs()[0].GetRandomID()
                                }
                                log.Debugf("building protocol log result, 
connetaion ID: %d, random ID: %d, connection exist: %t, delay: %t",
                                        conID, randomID, connection != nil, 
delay)
@@ -200,12 +201,12 @@ func (r *Runner) shouldReportProcessLog(pid uint32) bool {
        return true
 }
 
-func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog) 
(*common.ConnectionInfo,
+func (r *Runner) buildProtocolLog(protocolLog common.ProtocolLog) 
(*common.ConnectionInfo,
        []*v3.AccessLogKernelLog, *v3.AccessLogProtocolLogs, bool) {
-       if len(protocolLog.KernelLogs) == 0 {
+       if len(protocolLog.RelateKernelLogs()) == 0 {
                return nil, nil, nil, false
        }
-       firstKernelLog := protocolLog.KernelLogs[0]
+       firstKernelLog := protocolLog.RelateKernelLogs()[0]
        pid, _ := events.ParseConnectionID(firstKernelLog.GetConnectionID())
        // if the process not monitoring, then ignore it
        if !r.shouldReportProcessLog(pid) {
@@ -221,7 +222,7 @@ func (r *Runner) buildProtocolLog(protocolLog 
*common.ProtocolLog) (*common.Conn
                return nil, nil, nil, true
        }
        kernelLogs := make([]*v3.AccessLogKernelLog, 0)
-       for _, kl := range protocolLog.KernelLogs {
+       for _, kl := range protocolLog.RelateKernelLogs() {
                event := 
forwarder.BuildKernelLogFromEvent(common.LogTypeKernelTransfer, kl)
                if event == nil {
                        continue
@@ -229,25 +230,25 @@ func (r *Runner) buildProtocolLog(protocolLog 
*common.ProtocolLog) (*common.Conn
                kernelLogs = append(kernelLogs, event)
        }
 
-       return connection, kernelLogs, protocolLog.Protocol, false
+       return connection, kernelLogs, protocolLog.ProtocolLog(), false
 }
 
-func (r *Runner) buildKernelLog(kernelLog *common.KernelLog) 
(*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) {
-       pid, _ := events.ParseConnectionID(kernelLog.Event.GetConnectionID())
+func (r *Runner) buildKernelLog(kernelLog common.KernelLog) 
(*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) {
+       pid, _ := events.ParseConnectionID(kernelLog.Event().GetConnectionID())
        // if the process not monitoring, then ignore it
        if !r.shouldReportProcessLog(pid) {
                return nil, nil, false
        }
-       connection := r.context.ConnectionMgr.Find(kernelLog.Event)
+       connection := r.context.ConnectionMgr.Find(kernelLog.Event())
        if connection == nil {
                // if the connection cannot be found, it means that the 
connection have not been established
                // just re-add into the queue for checking in the next period
-               if time.Since(kernelLog.Event.Timestamp()) > 
kernelAccessLogCacheTime {
+               if time.Since(kernelLog.Event().Timestamp()) > 
kernelAccessLogCacheTime {
                        return nil, nil, false
                }
                return nil, nil, true
        }
-       event := forwarder.BuildKernelLogFromEvent(kernelLog.Type, 
kernelLog.Event)
+       event := forwarder.BuildKernelLogFromEvent(kernelLog.Type(), 
kernelLog.Event())
        return connection, event, false
 }
 

Reply via email to