This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch log-wrapper in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit 5198c2c6e55e7dd9d32dcaa1c7bb2d8d9458e61c Author: mrproliu <[email protected]> AuthorDate: Thu Mar 6 10:22:49 2025 +0800 Refactor kernel and protocol log when sending to backend --- pkg/accesslog/collector/protocols/http1.go | 4 +- pkg/accesslog/collector/protocols/http2.go | 4 +- pkg/accesslog/common/connection.go | 2 +- .../{forwarder/close.go => common/logs.go} | 53 ++++++++++++++-------- pkg/accesslog/common/queue.go | 36 ++++++--------- pkg/accesslog/forwarder/close.go | 2 +- pkg/accesslog/forwarder/connect.go | 4 +- pkg/accesslog/forwarder/protocol.go | 7 +-- pkg/accesslog/forwarder/transfer.go | 2 +- pkg/accesslog/runner.go | 39 ++++++++-------- 10 files changed, 80 insertions(+), 73 deletions(-) diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 695e9c4..6af2965 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -242,7 +242,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *Partit if host == "" && originalRequest.URL != nil { host = originalRequest.URL.Host } - forwarder.SendTransferProtocolEvent(p.ctx, details, &v3.AccessLogProtocolLogs{ + forwarder.SendTransferProtocolEvent(p.ctx, common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ StartTime: forwarder.BuildOffsetTimestamp(details[0].GetStartTime()), @@ -265,7 +265,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *Partit }, }, }, - }) + })) return nil } diff --git a/pkg/accesslog/collector/protocols/http2.go b/pkg/accesslog/collector/protocols/http2.go index 0ccdf00..1d701ce 100644 --- a/pkg/accesslog/collector/protocols/http2.go +++ b/pkg/accesslog/collector/protocols/http2.go @@ -263,7 +263,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2S if streamHost == "" { streamHost = stream.ReqHeader[":host"] } - forwarder.SendTransferProtocolEvent(r.ctx, details, &v3.AccessLogProtocolLogs{ + forwarder.SendTransferProtocolEvent(r.ctx, common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, details[0]).GetStartTime()), @@ -286,7 +286,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2S }, }, }, - }) + })) return nil } diff --git a/pkg/accesslog/common/connection.go b/pkg/accesslog/common/connection.go index fddc2f3..d4c8ef2 100644 --- a/pkg/accesslog/common/connection.go +++ b/pkg/accesslog/common/connection.go @@ -194,7 +194,7 @@ func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessL SocketFD: activateConn.SocketFD, Success: 0, }) - accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent) + accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose, wapperedEvent)) } case <-ctx.Done(): diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/common/logs.go similarity index 52% copy from pkg/accesslog/forwarder/close.go copy to pkg/accesslog/common/logs.go index d949588..3355941 100644 --- a/pkg/accesslog/forwarder/close.go +++ b/pkg/accesslog/common/logs.go @@ -15,35 +15,50 @@ // specific language governing permissions and limitations // under the License. -package forwarder +package common import ( - "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/accesslog/events" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) -func init() { - RegisterKernelLogBuilder(common.LogTypeClose, closeLogBuilder) +type kernelLogEvent struct { + logType LogType + event events.Event } -func SendCloseEvent(context *common.AccessLogContext, event *common.CloseEventWithNotify) { - context.Queue.AppendKernelLog(common.LogTypeClose, event) +func NewKernelLogEvent(logType LogType, event events.Event) KernelLog { + return &kernelLogEvent{ + logType: logType, + event: event, + } } -func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog { - closeEvent := event.(*common.CloseEventWithNotify) - if closeEvent.StartTime == 0 { - return nil - } - closeOp := &v3.AccessLogKernelCloseOperation{} - closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime) - closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime) - closeOp.Success = closeEvent.Success == 1 - return &v3.AccessLogKernelLog{ - Operation: &v3.AccessLogKernelLog_Close{ - Close: closeOp, - }, +func (k *kernelLogEvent) Type() LogType { + return k.logType +} + +func (k *kernelLogEvent) Event() events.Event { + return k.event +} + +type ProtocolEventData struct { + KernelLogs []events.SocketDetail + ProtocolLogData *v3.AccessLogProtocolLogs +} + +func (r *ProtocolEventData) RelateKernelLogs() []events.SocketDetail { + return r.KernelLogs +} + +func (r *ProtocolEventData) ProtocolLog() *v3.AccessLogProtocolLogs { + return r.ProtocolLogData +} + +func NewProtocolLogEvent(kernelLogs []events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) ProtocolLog { + return &ProtocolEventData{ + KernelLogs: kernelLogs, + ProtocolLogData: protocolData, } } diff --git a/pkg/accesslog/common/queue.go b/pkg/accesslog/common/queue.go index 131a29a..b4a9cd0 100644 --- a/pkg/accesslog/common/queue.go +++ b/pkg/accesslog/common/queue.go @@ -33,19 +33,19 @@ import ( var log = logger.GetLogger("access_log", "common") -type KernelLog struct { - Type LogType - Event events.Event +type KernelLog interface { + Type() LogType + Event() events.Event } -type ProtocolLog struct { - KernelLogs []events.SocketDetail - Protocol *v3.AccessLogProtocolLogs +type ProtocolLog interface { + RelateKernelLogs() []events.SocketDetail + ProtocolLog() *v3.AccessLogProtocolLogs } type Queue struct { - kernelLogs chan *KernelLog - protocolLogs chan *ProtocolLog + kernelLogs chan KernelLog + protocolLogs chan ProtocolLog maxFlushCount int period time.Duration @@ -57,13 +57,13 @@ type Queue struct { } type QueueConsumer interface { - Consume(kernels chan *KernelLog, protocols chan *ProtocolLog) + Consume(kernels chan KernelLog, protocols chan ProtocolLog) } func NewQueue(maxFlushCount int, period time.Duration, consumer QueueConsumer) *Queue { return &Queue{ - kernelLogs: make(chan *KernelLog, maxFlushCount*3), - protocolLogs: make(chan *ProtocolLog, maxFlushCount*3), + kernelLogs: make(chan KernelLog, maxFlushCount*3), + protocolLogs: make(chan ProtocolLog, maxFlushCount*3), maxFlushCount: maxFlushCount, period: period, consumer: consumer, @@ -71,12 +71,9 @@ func NewQueue(maxFlushCount int, period time.Duration, consumer QueueConsumer) * } } -func (q *Queue) AppendKernelLog(tp LogType, event events.Event) { +func (q *Queue) AppendKernelLog(log KernelLog) { select { - case q.kernelLogs <- &KernelLog{ - Type: tp, - Event: event, - }: + case q.kernelLogs <- log: default: atomic.AddInt64(&q.dropKernelLogCount, 1) return @@ -84,12 +81,9 @@ func (q *Queue) AppendKernelLog(tp LogType, event events.Event) { q.consumeIfNeed() } -func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol *v3.AccessLogProtocolLogs) { +func (q *Queue) AppendProtocolLog(log ProtocolLog) { select { - case q.protocolLogs <- &ProtocolLog{ - KernelLogs: kernelLogs, - Protocol: protocol, - }: + case q.protocolLogs <- log: default: atomic.AddInt64(&q.dropProtocolLogCount, 1) return diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go index d949588..6d27ffa 100644 --- a/pkg/accesslog/forwarder/close.go +++ b/pkg/accesslog/forwarder/close.go @@ -29,7 +29,7 @@ func init() { } func SendCloseEvent(context *common.AccessLogContext, event *common.CloseEventWithNotify) { - context.Queue.AppendKernelLog(common.LogTypeClose, event) + context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeClose, event)) } func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog { diff --git a/pkg/accesslog/forwarder/connect.go b/pkg/accesslog/forwarder/connect.go index 2058d25..ba3601e 100644 --- a/pkg/accesslog/forwarder/connect.go +++ b/pkg/accesslog/forwarder/connect.go @@ -31,10 +31,10 @@ func init() { } func SendConnectEvent(context *common.AccessLogContext, event *events.SocketConnectEvent, socketPair *ip.SocketPair) { - context.Queue.AppendKernelLog(common.LogTypeConnect, &common.ConnectEventWithSocket{ + context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeConnect, &common.ConnectEventWithSocket{ SocketConnectEvent: event, SocketPair: socketPair, - }) + })) } func connectLogBuilder(event events.Event) *v3.AccessLogKernelLog { diff --git a/pkg/accesslog/forwarder/protocol.go b/pkg/accesslog/forwarder/protocol.go index 8e8c5b4..ac60bed 100644 --- a/pkg/accesslog/forwarder/protocol.go +++ b/pkg/accesslog/forwarder/protocol.go @@ -19,11 +19,8 @@ package forwarder import ( "github.com/apache/skywalking-rover/pkg/accesslog/common" - "github.com/apache/skywalking-rover/pkg/accesslog/events" - - v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) -func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs []events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) { - context.Queue.AppendProtocolLog(kernelLogs, protocolData) +func SendTransferProtocolEvent(context *common.AccessLogContext, event common.ProtocolLog) { + context.Queue.AppendProtocolLog(event) } diff --git a/pkg/accesslog/forwarder/transfer.go b/pkg/accesslog/forwarder/transfer.go index 44c0d4e..22d7fb1 100644 --- a/pkg/accesslog/forwarder/transfer.go +++ b/pkg/accesslog/forwarder/transfer.go @@ -30,7 +30,7 @@ func init() { } func SendTransferNoProtocolEvent(context *common.AccessLogContext, event events.SocketDetail) { - context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event) + context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeKernelTransfer, event)) } func kernelTransferLogBuilder(event events.Event) *v3.AccessLogKernelLog { diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go index d483172..5dce05d 100644 --- a/pkg/accesslog/runner.go +++ b/pkg/accesslog/runner.go @@ -105,7 +105,7 @@ func (r *Runner) Start(ctx context.Context) error { return nil } -func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan *common.ProtocolLog) { +func (r *Runner) Consume(kernels chan common.KernelLog, protocols chan common.ProtocolLog) { if r.backendOp.GetConnectionStatus() != backend.Connected { log.Warnf("failure to connect to the backend, skip generating access log") return @@ -117,21 +117,21 @@ func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan *common. r.sender.AddBatch(batch) } -func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan *common.KernelLog, protocols chan *common.ProtocolLog) { +func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan common.KernelLog, protocols chan common.ProtocolLog) { r.buildKernelLogs(kernels, batch) r.buildProtocolLogs(protocols, batch) r.context.ConnectionMgr.OnBuildConnectionLogFinished() } -func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch *sender.BatchLogs) { - delayAppends := make([]*common.KernelLog, 0) +func (r *Runner) buildKernelLogs(kernels chan common.KernelLog, batch *sender.BatchLogs) { + delayAppends := make([]common.KernelLog, 0) for { select { case kernelLog := <-kernels: connection, curLog, delay := r.buildKernelLog(kernelLog) log.Debugf("building kernel log result, connetaion ID: %d, random ID: %d, exist connection: %t, delay: %t", - kernelLog.Event.GetConnectionID(), kernelLog.Event.GetRandomID(), connection != nil, delay) + kernelLog.Event().GetConnectionID(), kernelLog.Event().GetRandomID(), connection != nil, delay) if connection != nil && curLog != nil { batch.AppendKernelLog(connection, curLog) } else if delay { @@ -150,17 +150,18 @@ func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch *sender.B } } -func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch *sender.BatchLogs) { - delayAppends := make([]*common.ProtocolLog, 0) +func (r *Runner) buildProtocolLogs(protocols chan common.ProtocolLog, batch *sender.BatchLogs) { + delayAppends := make([]common.ProtocolLog, 0) for { select { case protocolLog := <-protocols: connection, kernelLogs, protocolLogs, delay := r.buildProtocolLog(protocolLog) if log.Enable(logrus.DebugLevel) { - kernelLogCount := len(protocolLog.KernelLogs) + kernelLogCount := len(protocolLog.RelateKernelLogs()) var conID, randomID uint64 if kernelLogCount > 0 { - conID, randomID = protocolLog.KernelLogs[0].GetConnectionID(), protocolLog.KernelLogs[0].GetRandomID() + conID, randomID = protocolLog.RelateKernelLogs()[0].GetConnectionID(), + protocolLog.RelateKernelLogs()[0].GetRandomID() } log.Debugf("building protocol log result, connetaion ID: %d, random ID: %d, connection exist: %t, delay: %t", conID, randomID, connection != nil, delay) @@ -200,12 +201,12 @@ func (r *Runner) shouldReportProcessLog(pid uint32) bool { return true } -func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog) (*common.ConnectionInfo, +func (r *Runner) buildProtocolLog(protocolLog common.ProtocolLog) (*common.ConnectionInfo, []*v3.AccessLogKernelLog, *v3.AccessLogProtocolLogs, bool) { - if len(protocolLog.KernelLogs) == 0 { + if len(protocolLog.RelateKernelLogs()) == 0 { return nil, nil, nil, false } - firstKernelLog := protocolLog.KernelLogs[0] + firstKernelLog := protocolLog.RelateKernelLogs()[0] pid, _ := events.ParseConnectionID(firstKernelLog.GetConnectionID()) // if the process not monitoring, then ignore it if !r.shouldReportProcessLog(pid) { @@ -221,7 +222,7 @@ func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog) (*common.Conn return nil, nil, nil, true } kernelLogs := make([]*v3.AccessLogKernelLog, 0) - for _, kl := range protocolLog.KernelLogs { + for _, kl := range protocolLog.RelateKernelLogs() { event := forwarder.BuildKernelLogFromEvent(common.LogTypeKernelTransfer, kl) if event == nil { continue @@ -229,25 +230,25 @@ func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog) (*common.Conn kernelLogs = append(kernelLogs, event) } - return connection, kernelLogs, protocolLog.Protocol, false + return connection, kernelLogs, protocolLog.ProtocolLog(), false } -func (r *Runner) buildKernelLog(kernelLog *common.KernelLog) (*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) { - pid, _ := events.ParseConnectionID(kernelLog.Event.GetConnectionID()) +func (r *Runner) buildKernelLog(kernelLog common.KernelLog) (*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) { + pid, _ := events.ParseConnectionID(kernelLog.Event().GetConnectionID()) // if the process not monitoring, then ignore it if !r.shouldReportProcessLog(pid) { return nil, nil, false } - connection := r.context.ConnectionMgr.Find(kernelLog.Event) + connection := r.context.ConnectionMgr.Find(kernelLog.Event()) if connection == nil { // if the connection cannot be found, it means that the connection have not been established // just re-add into the queue for checking in the next period - if time.Since(kernelLog.Event.Timestamp()) > kernelAccessLogCacheTime { + if time.Since(kernelLog.Event().Timestamp()) > kernelAccessLogCacheTime { return nil, nil, false } return nil, nil, true } - event := forwarder.BuildKernelLogFromEvent(kernelLog.Type, kernelLog.Event) + event := forwarder.BuildKernelLogFromEvent(kernelLog.Type(), kernelLog.Event()) return connection, event, false }
