This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/tmp_disable_reading by this
push:
new afca168 add sender
afca168 is described below
commit afca168252ca04650d073caab72640eb8cfc1635
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 21 20:47:48 2024 +0800
add sender
---
pkg/accesslog/common/connection.go | 58 +++++++---
pkg/accesslog/runner.go | 161 +++-----------------------
pkg/accesslog/sender/logs.go | 106 +++++++++++++++++
pkg/accesslog/sender/sender.go | 231 +++++++++++++++++++++++++++++++++++++
4 files changed, 399 insertions(+), 157 deletions(-)
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 195fd08..e24a5cb 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -57,6 +57,9 @@ const (
// in case the reading the data from BPF queue is disordered, so add a
delay time to delete the connection information
connectionDeleteDelayTime = time.Second * 20
+
+ // the connection check exist time
+ connectionCheckExistTime = time.Second * 30
)
type addressProcessType int
@@ -144,13 +147,14 @@ type addressInfo struct {
}
type ConnectionInfo struct {
- ConnectionID uint64
- RandomID uint64
- RPCConnection *v3.AccessLogConnection
- MarkDeletable bool
- PID uint32
- Socket *ip.SocketPair
- DeleteAfter *time.Time
+ ConnectionID uint64
+ RandomID uint64
+ RPCConnection *v3.AccessLogConnection
+ MarkDeletable bool
+ PID uint32
+ Socket *ip.SocketPair
+ LastCheckExistTime time.Time
+ DeleteAfter *time.Time
}
func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
@@ -362,11 +366,12 @@ func (c *ConnectionManager) buildConnection(event
*events.SocketConnectEvent, so
Protocol: v3.AccessLogProtocolType_TCP,
}
return &ConnectionInfo{
- ConnectionID: event.ConID,
- RandomID: event.RandomID,
- RPCConnection: connection,
- PID: event.PID,
- Socket: socket,
+ ConnectionID: event.ConID,
+ RandomID: event.RandomID,
+ RPCConnection: connection,
+ PID: event.PID,
+ Socket: socket,
+ LastCheckExistTime: time.Now(),
}
}
@@ -556,6 +561,30 @@ func (c *ConnectionManager)
shouldMonitorProcesses(entities []api.ProcessInterfa
return c.monitorFilter.ShouldIncludeProcesses(entities)
}
+func (c *ConnectionManager) checkConnectionIsExist(con *ConnectionInfo) bool {
+ // skip the check if the check time is not reach
+ if time.Since(con.LastCheckExistTime) < connectionCheckExistTime {
+ return true
+ }
+ con.LastCheckExistTime = time.Now()
+ var activateConn ActiveConnection
+ c.activeConnectionMap.Lookup(con.ConnectionID, &activateConn)
+ if err := c.activeConnectionMap.Lookup(con.ConnectionID,
&activateConn); err != nil {
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ con.MarkDeletable = true
+ return false
+ }
+ log.Warnf("cannot found the active connection: %d-%d, err: %v",
con.ConnectionID, con.RandomID, err)
+ return false
+ } else if activateConn.RandomID != 0 && activateConn.RandomID !=
con.RandomID {
+ log.Debugf("detect the connection: %d-%d is already closed(by
difference random ID), so remove from the connection manager",
+ con.ConnectionID, con.RandomID)
+ con.MarkDeletable = true
+ return false
+ }
+ return true
+}
+
func (c *ConnectionManager) RemoveProcess(pid int32, entities
[]api.ProcessInterface) {
c.monitoringProcessLock.Lock()
defer c.monitoringProcessLock.Unlock()
@@ -634,7 +663,7 @@ func (c *ConnectionManager)
updateMonitorStatusForProcess(pid int32, monitor boo
func (c *ConnectionManager) OnBuildConnectionLogFinished() {
// delete all connections which marked as deletable
// all deletable connection events been sent
- deletableConnections := make(map[string]bool, 0)
+ deletableConnections := make(map[string]bool)
now := time.Now()
c.connections.IterCb(func(key string, v interface{}) {
con, ok := v.(*ConnectionInfo)
@@ -643,6 +672,9 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
}
// already mark as deletable or process not monitoring
shouldDelete := con.MarkDeletable ||
!c.ProcessIsMonitor(con.PID)
+ if !shouldDelete {
+ shouldDelete = !c.checkConnectionIsExist(con)
+ }
if shouldDelete && con.DeleteAfter == nil {
deleteAfterTime := now.Add(connectionDeleteDelayTime)
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index 240804e..95718bb 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -32,14 +32,11 @@ import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
+ "github.com/apache/skywalking-rover/pkg/accesslog/sender"
"github.com/apache/skywalking-rover/pkg/core"
"github.com/apache/skywalking-rover/pkg/core/backend"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
- "github.com/apache/skywalking-rover/pkg/process"
- "github.com/apache/skywalking-rover/pkg/tools/host"
-
- v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
@@ -53,8 +50,8 @@ type Runner struct {
mgr *module.Manager
backendOp backend.Operator
cluster string
- alsClient v3.EBPFAccessLogServiceClient
ctx context.Context
+ sender *sender.GRPCSender
}
func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) {
@@ -70,17 +67,18 @@ func NewRunner(mgr *module.Manager, config *common.Config)
(*Runner, error) {
backendOP := coreModule.BackendOperator()
clusterName := coreModule.ClusterName()
monitorFilter :=
common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","),
strings.Split(config.ExcludeClusters, ","))
+ connectionMgr := common.NewConnectionManager(config, mgr, bpfLoader,
monitorFilter)
runner := &Runner{
context: &common.AccessLogContext{
BPF: bpfLoader,
Config: config,
- ConnectionMgr: common.NewConnectionManager(config, mgr,
bpfLoader, monitorFilter),
+ ConnectionMgr: connectionMgr,
},
collectors: collector.Collectors(),
mgr: mgr,
backendOp: backendOP,
cluster: clusterName,
- alsClient:
v3.NewEBPFAccessLogServiceClient(backendOP.GetConnection()),
+ sender: sender.NewGRPCSender(mgr, connectionMgr),
}
runner.context.Queue = common.NewQueue(config.Flush.MaxCountOneStream,
flushDuration, runner)
return runner, nil
@@ -91,6 +89,7 @@ func (r *Runner) Start(ctx context.Context) error {
r.context.RuntimeContext = ctx
r.context.Queue.Start(ctx)
r.context.ConnectionMgr.Start(ctx, r.context)
+ r.sender.Start(ctx)
for _, c := range r.collectors {
err := c.Start(r.mgr, r.context)
if err != nil {
@@ -111,26 +110,20 @@ func (r *Runner) Consume(kernels chan *common.KernelLog,
protocols chan *common.
return
}
- allLogs := r.buildConnectionLogs(kernels, protocols)
- log.Debugf("ready to send access log, connection count: %d",
len(allLogs))
- if len(allLogs) == 0 {
- return
- }
- if err := r.sendLogs(allLogs); err != nil {
- log.Warnf("send access log failure: %v", err)
- }
+ batch := r.sender.NewBatch()
+ r.buildConnectionLogs(batch, kernels, protocols)
+ log.Debugf("ready to send access log, connection count: %d",
batch.ConnectionCount())
+ r.sender.AddBatch(batch)
}
-func (r *Runner) buildConnectionLogs(kernels chan *common.KernelLog, protocols
chan *common.ProtocolLog) map[*common.ConnectionInfo]*connectionLogs {
- result := make(map[*common.ConnectionInfo]*connectionLogs)
- r.buildKernelLogs(kernels, result)
- r.buildProtocolLogs(protocols, result)
+func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan
*common.KernelLog, protocols chan *common.ProtocolLog) {
+ r.buildKernelLogs(kernels, batch)
+ r.buildProtocolLogs(protocols, batch)
r.context.ConnectionMgr.OnBuildConnectionLogFinished()
- return result
}
-func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch
*sender.BatchLogs) {
delayAppends := make([]*common.KernelLog, 0)
for {
select {
@@ -139,13 +132,7 @@ func (r *Runner) buildKernelLogs(kernels chan
*common.KernelLog, result map[*com
log.Debugf("building kernel log result, connetaion ID:
%d, random ID: %d, exist connection: %t, delay: %t",
kernelLog.Event.GetConnectionID(),
kernelLog.Event.GetRandomID(), connection != nil, delay)
if connection != nil && curLog != nil {
- logs, exist := result[connection]
- if !exist {
- logs = newConnectionLogs()
- result[connection] = logs
- }
-
- logs.kernels = append(logs.kernels, curLog)
+ batch.AppendKernelLog(connection, curLog)
} else if delay {
delayAppends = append(delayAppends, kernelLog)
}
@@ -162,7 +149,7 @@ func (r *Runner) buildKernelLogs(kernels chan
*common.KernelLog, result map[*com
}
}
-func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch
*sender.BatchLogs) {
delayAppends := make([]*common.ProtocolLog, 0)
for {
select {
@@ -178,15 +165,7 @@ func (r *Runner) buildProtocolLogs(protocols chan
*common.ProtocolLog, result ma
conID, randomID, connection != nil,
delay)
}
if connection != nil && len(kernelLogs) > 0 &&
protocolLogs != nil {
- logs, exist := result[connection]
- if !exist {
- logs = newConnectionLogs()
- result[connection] = logs
- }
- logs.protocols = append(logs.protocols,
&connectionProtocolLog{
- kernels: kernelLogs,
- protocol: protocolLogs,
- })
+ batch.AppendProtocolLog(connection,
protocolLogs)
} else if delay {
delayAppends = append(delayAppends, protocolLog)
}
@@ -203,95 +182,6 @@ func (r *Runner) buildProtocolLogs(protocols chan
*common.ProtocolLog, result ma
}
}
-func (r *Runner) sendLogs(allLogs map[*common.ConnectionInfo]*connectionLogs)
error {
- timeout, cancelFunc := context.WithTimeout(r.ctx, time.Second*30)
- defer cancelFunc()
- streaming, err := r.alsClient.Collect(timeout)
- if err != nil {
- return err
- }
-
- firstLog := true
- firstConnection := true
- for connection, logs := range allLogs {
- if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
- continue
- }
- if log.Enable(logrus.DebugLevel) {
- log.Debugf("ready to sending access log with
connection, connection ID: %d, random ID: %d, "+
- "local: %s, remote: %s, role: %s, kernel logs
count: %d, protocol log count: %d",
- connection.ConnectionID, connection.RandomID,
connection.RPCConnection.Local, connection.RPCConnection.Remote,
- connection.RPCConnection.Role,
len(logs.kernels), len(logs.protocols))
- }
-
- if len(logs.kernels) > 0 {
- r.sendLogToTheStream(streaming,
r.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels,
nil))
- firstLog, firstConnection = false, false
- }
- for _, protocolLog := range logs.protocols {
- r.sendLogToTheStream(streaming,
r.buildAccessLogMessage(firstLog, firstConnection, connection,
protocolLog.kernels, protocolLog.protocol))
- firstLog, firstConnection = false, false
- }
-
- firstConnection = true
- }
-
- if _, err := streaming.CloseAndRecv(); err != nil {
- log.Warnf("closing the access log streaming error: %v", err)
- }
- return nil
-}
-
-func (r *Runner) sendLogToTheStream(streaming
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) {
- if err := streaming.Send(logMsg); err != nil {
- log.Warnf("send access log message failure: %v", err)
- }
-}
-
-func (r *Runner) buildAccessLogMessage(firstLog, firstConnection bool, conn
*common.ConnectionInfo,
- kernelLogs []*v3.AccessLogKernelLog, protocolLog
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
- var rpcCon *v3.AccessLogConnection
- if firstConnection {
- rpcCon = conn.RPCConnection
- }
- return &v3.EBPFAccessLogMessage{
- Node: r.BuildNodeInfo(firstLog),
- Connection: rpcCon,
- KernelLogs: kernelLogs,
- ProtocolLog: protocolLog,
- }
-}
-
-func (r *Runner) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
- if !needs {
- return nil
- }
- netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
- for i, n := range host.AllNetworkInterfaces() {
- netInterfaces = append(netInterfaces,
&v3.EBPFAccessLogNodeNetInterface{
- Index: int32(i),
- Mtu: int32(n.MTU),
- Name: n.Name,
- })
- }
- return &v3.EBPFAccessLogNodeInfo{
- Name:
r.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
- NetInterfaces: netInterfaces,
- BootTime: r.convertTimeToInstant(host.BootTime),
- ClusterName: r.cluster,
- Policy: &v3.EBPFAccessLogPolicy{
- ExcludeNamespaces:
r.context.ConnectionMgr.GetExcludeNamespaces(),
- },
- }
-}
-
-func (r *Runner) convertTimeToInstant(t time.Time) *v32.Instant {
- return &v32.Instant{
- Seconds: t.Unix(),
- Nanos: int32(t.Nanosecond()),
- }
-}
-
func (r *Runner) shouldReportProcessLog(pid uint32) bool {
// if the process not monitoring, then check the process is existed or
not
if r.context.ConnectionMgr.ProcessIsMonitor(pid) {
@@ -364,20 +254,3 @@ func (r *Runner) Stop() error {
r.context.ConnectionMgr.Stop()
return nil
}
-
-type connectionLogs struct {
- kernels []*v3.AccessLogKernelLog
- protocols []*connectionProtocolLog
-}
-
-type connectionProtocolLog struct {
- kernels []*v3.AccessLogKernelLog
- protocol *v3.AccessLogProtocolLogs
-}
-
-func newConnectionLogs() *connectionLogs {
- return &connectionLogs{
- kernels: make([]*v3.AccessLogKernelLog, 0),
- protocols: make([]*connectionProtocolLog, 0),
- }
-}
diff --git a/pkg/accesslog/sender/logs.go b/pkg/accesslog/sender/logs.go
new file mode 100644
index 0000000..d381671
--- /dev/null
+++ b/pkg/accesslog/sender/logs.go
@@ -0,0 +1,106 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+ "github.com/apache/skywalking-rover/pkg/accesslog/common"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var maxLogsPerSend = 10_000
+
+type BatchLogs struct {
+ logs map[*common.ConnectionInfo]*ConnectionLogs
+}
+
+func newBatchLogs() *BatchLogs {
+ return &BatchLogs{
+ logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+ }
+}
+
+func (l *BatchLogs) ConnectionCount() int {
+ return len(l.logs)
+}
+
+func (l *BatchLogs) AppendKernelLog(connection *common.ConnectionInfo, log
*v3.AccessLogKernelLog) {
+ logs, ok := l.logs[connection]
+ if !ok {
+ logs = newConnectionLogs()
+ l.logs[connection] = logs
+ }
+
+ logs.kernels = append(logs.kernels, log)
+}
+
+func (l *BatchLogs) AppendProtocolLog(connection *common.ConnectionInfo, log
*v3.AccessLogProtocolLogs) {
+ logs, ok := l.logs[connection]
+ if !ok {
+ logs = newConnectionLogs()
+ l.logs[connection] = logs
+ }
+
+ logs.protocols = append(logs.protocols, &ConnectionProtocolLog{
+ protocol: log,
+ })
+}
+
+func (l *BatchLogs) splitBatchLogs() []*BatchLogs {
+ logsCount := len(l.logs)
+ if logsCount == 0 {
+ return nil
+ }
+ splitCount := logsCount / maxLogsPerSend
+ if logsCount%maxLogsPerSend != 0 {
+ splitCount++
+ }
+ result := make([]*BatchLogs, 0, splitCount)
+
+ // split the connections by maxLogsPerSend
+ currentCount := 0
+ var currentBatch *BatchLogs
+ for connection, logs := range l.logs {
+ if currentCount == 0 {
+ currentBatch = newBatchLogs()
+ result = append(result, currentBatch)
+ currentCount = 0
+ }
+ currentBatch.logs[connection] = logs
+ currentCount++
+ }
+
+ return result
+}
+
+type ConnectionLogs struct {
+ kernels []*v3.AccessLogKernelLog
+ protocols []*ConnectionProtocolLog
+}
+
+type ConnectionProtocolLog struct {
+ kernels []*v3.AccessLogKernelLog
+ protocol *v3.AccessLogProtocolLogs
+}
+
+func newConnectionLogs() *ConnectionLogs {
+ return &ConnectionLogs{
+ kernels: make([]*v3.AccessLogKernelLog, 0),
+ protocols: make([]*ConnectionProtocolLog, 0),
+ }
+}
diff --git a/pkg/accesslog/sender/sender.go b/pkg/accesslog/sender/sender.go
new file mode 100644
index 0000000..9cc5b2f
--- /dev/null
+++ b/pkg/accesslog/sender/sender.go
@@ -0,0 +1,231 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+ "container/list"
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/accesslog/common"
+ "github.com/apache/skywalking-rover/pkg/core"
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process"
+ "github.com/apache/skywalking-rover/pkg/tools/host"
+
+ "github.com/sirupsen/logrus"
+
+ v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var log = logger.GetLogger("accesslog", "sender")
+
+// GRPCSender Async to sending the access log to the backend
+type GRPCSender struct {
+ logs *list.List
+ notify chan bool
+ mutex sync.Mutex
+ ctx context.Context
+
+ mgr *module.Manager
+ connectionMgr *common.ConnectionManager
+ alsClient v3.EBPFAccessLogServiceClient
+ clusterName string
+}
+
+// NewGRPCSender creates a new GRPCSender
+func NewGRPCSender(mgr *module.Manager, connectionMgr
*common.ConnectionManager) *GRPCSender {
+ return &GRPCSender{
+ logs: list.New(),
+ notify: make(chan bool, 1),
+ mgr: mgr,
+ connectionMgr: connectionMgr,
+ clusterName:
mgr.FindModule(core.ModuleName).(core.Operator).ClusterName(),
+ alsClient:
v3.NewEBPFAccessLogServiceClient(mgr.FindModule(core.ModuleName).(core.Operator).
+ BackendOperator().GetConnection()),
+ }
+}
+
+func (g *GRPCSender) Start(ctx context.Context) {
+ g.ctx = ctx
+ go func() {
+ for {
+ select {
+ case <-g.notify:
+ if count, err := g.handleLogs(); err != nil {
+ log.Warnf("sending access log error,
lost %d logs, error: %v", count, err)
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+func (g *GRPCSender) NewBatch() *BatchLogs {
+ return &BatchLogs{
+ logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+ }
+}
+
+func (g *GRPCSender) AddBatch(batch *BatchLogs) {
+ // split logs
+ splitLogs := batch.splitBatchLogs()
+
+ // append the resend logs
+ g.mutex.Lock()
+ defer g.mutex.Unlock()
+ for _, l := range splitLogs {
+ g.logs.PushBack(l)
+ }
+
+ // notify the sender
+ select {
+ case g.notify <- true:
+ default:
+ }
+}
+
+func (g *GRPCSender) handleLogs() (int, error) {
+ for {
+ // pop logs
+ logs := g.popLogs()
+ if logs == nil {
+ return 0, nil
+ }
+ // send logs
+ if err := g.sendLogs(logs); err != nil {
+ return len(logs.logs), err
+ }
+ }
+}
+
+func (g *GRPCSender) sendLogs(batch *BatchLogs) error {
+ timeout, cancelFunc := context.WithTimeout(g.ctx, time.Second*20)
+ defer cancelFunc()
+ streaming, err := g.alsClient.Collect(timeout)
+ if err != nil {
+ return err
+ }
+
+ firstLog := true
+ firstConnection := true
+ var sendError error
+ for connection, logs := range batch.logs {
+ if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
+ continue
+ }
+ if log.Enable(logrus.DebugLevel) {
+ log.Debugf("ready to sending access log with
connection, connection ID: %d, random ID: %d, "+
+ "local: %s, remote: %s, role: %s, contains
ztunnel address: %t, kernel logs count: %d, protocol log count: %d",
+ connection.ConnectionID, connection.RandomID,
connection.RPCConnection.Local, connection.RPCConnection.Remote,
+ connection.RPCConnection.Role,
connection.RPCConnection.Attachment != nil, len(logs.kernels),
len(logs.protocols))
+ }
+
+ if len(logs.kernels) > 0 {
+ sendError = g.sendLogToTheStream(streaming,
g.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels,
nil))
+ firstLog, firstConnection = false, false
+ }
+ for _, protocolLog := range logs.protocols {
+ sendError = g.sendLogToTheStream(streaming,
g.buildAccessLogMessage(firstLog, firstConnection, connection,
protocolLog.kernels, protocolLog.protocol))
+ firstLog, firstConnection = false, false
+ }
+ if sendError != nil {
+ g.closeStream(streaming)
+ return fmt.Errorf("sending access log error: %v",
sendError)
+ }
+
+ firstConnection = true
+ }
+
+ g.closeStream(streaming)
+ return nil
+}
+
+func (g *GRPCSender) closeStream(s v3.EBPFAccessLogService_CollectClient) {
+ if _, err := s.CloseAndRecv(); err != nil {
+ log.Warnf("closing the access log streaming error: %v", err)
+ }
+}
+
+func (g *GRPCSender) sendLogToTheStream(streaming
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) error {
+ if err := streaming.Send(logMsg); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (g *GRPCSender) buildAccessLogMessage(firstLog, firstConnection bool,
conn *common.ConnectionInfo,
+ kernelLogs []*v3.AccessLogKernelLog, protocolLog
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
+ var rpcCon *v3.AccessLogConnection
+ if firstConnection {
+ rpcCon = conn.RPCConnection
+ }
+ return &v3.EBPFAccessLogMessage{
+ Node: g.BuildNodeInfo(firstLog),
+ Connection: rpcCon,
+ KernelLogs: kernelLogs,
+ ProtocolLog: protocolLog,
+ }
+}
+
+func (g *GRPCSender) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
+ if !needs {
+ return nil
+ }
+ netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
+ for i, n := range host.AllNetworkInterfaces() {
+ netInterfaces = append(netInterfaces,
&v3.EBPFAccessLogNodeNetInterface{
+ Index: int32(i),
+ Mtu: int32(n.MTU),
+ Name: n.Name,
+ })
+ }
+ return &v3.EBPFAccessLogNodeInfo{
+ Name:
g.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
+ NetInterfaces: netInterfaces,
+ BootTime: g.convertTimeToInstant(host.BootTime),
+ ClusterName: g.clusterName,
+ Policy: &v3.EBPFAccessLogPolicy{
+ ExcludeNamespaces:
g.connectionMgr.GetExcludeNamespaces(),
+ },
+ }
+}
+
+func (g *GRPCSender) convertTimeToInstant(t time.Time) *v32.Instant {
+ return &v32.Instant{
+ Seconds: t.Unix(),
+ Nanos: int32(t.Nanosecond()),
+ }
+}
+
+func (g *GRPCSender) popLogs() *BatchLogs {
+ g.mutex.Lock()
+ defer g.mutex.Unlock()
+ if g.logs.Len() == 0 {
+ return nil
+ }
+ e := g.logs.Front()
+ logs := e.Value.(*BatchLogs)
+ g.logs.Remove(e)
+ return logs
+}