This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch tmp_disable_reading
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/tmp_disable_reading by this 
push:
     new afca168  add sender
afca168 is described below

commit afca168252ca04650d073caab72640eb8cfc1635
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 21 20:47:48 2024 +0800

    add sender
---
 pkg/accesslog/common/connection.go |  58 +++++++---
 pkg/accesslog/runner.go            | 161 +++-----------------------
 pkg/accesslog/sender/logs.go       | 106 +++++++++++++++++
 pkg/accesslog/sender/sender.go     | 231 +++++++++++++++++++++++++++++++++++++
 4 files changed, 399 insertions(+), 157 deletions(-)

diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 195fd08..e24a5cb 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -57,6 +57,9 @@ const (
 
        // in case the reading the data from BPF queue is disordered, so add a 
delay time to delete the connection information
        connectionDeleteDelayTime = time.Second * 20
+
+       // the connection check exist time
+       connectionCheckExistTime = time.Second * 30
 )
 
 type addressProcessType int
@@ -144,13 +147,14 @@ type addressInfo struct {
 }
 
 type ConnectionInfo struct {
-       ConnectionID  uint64
-       RandomID      uint64
-       RPCConnection *v3.AccessLogConnection
-       MarkDeletable bool
-       PID           uint32
-       Socket        *ip.SocketPair
-       DeleteAfter   *time.Time
+       ConnectionID       uint64
+       RandomID           uint64
+       RPCConnection      *v3.AccessLogConnection
+       MarkDeletable      bool
+       PID                uint32
+       Socket             *ip.SocketPair
+       LastCheckExistTime time.Time
+       DeleteAfter        *time.Time
 }
 
 func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
@@ -362,11 +366,12 @@ func (c *ConnectionManager) buildConnection(event 
*events.SocketConnectEvent, so
                Protocol: v3.AccessLogProtocolType_TCP,
        }
        return &ConnectionInfo{
-               ConnectionID:  event.ConID,
-               RandomID:      event.RandomID,
-               RPCConnection: connection,
-               PID:           event.PID,
-               Socket:        socket,
+               ConnectionID:       event.ConID,
+               RandomID:           event.RandomID,
+               RPCConnection:      connection,
+               PID:                event.PID,
+               Socket:             socket,
+               LastCheckExistTime: time.Now(),
        }
 }
 
@@ -556,6 +561,30 @@ func (c *ConnectionManager) 
shouldMonitorProcesses(entities []api.ProcessInterfa
        return c.monitorFilter.ShouldIncludeProcesses(entities)
 }
 
+func (c *ConnectionManager) checkConnectionIsExist(con *ConnectionInfo) bool {
+       // skip the check if the check time is not reach
+       if time.Since(con.LastCheckExistTime) < connectionCheckExistTime {
+               return true
+       }
+       con.LastCheckExistTime = time.Now()
+       var activateConn ActiveConnection
+       c.activeConnectionMap.Lookup(con.ConnectionID, &activateConn)
+       if err := c.activeConnectionMap.Lookup(con.ConnectionID, 
&activateConn); err != nil {
+               if errors.Is(err, ebpf.ErrKeyNotExist) {
+                       con.MarkDeletable = true
+                       return false
+               }
+               log.Warnf("cannot found the active connection: %d-%d, err: %v", 
con.ConnectionID, con.RandomID, err)
+               return false
+       } else if activateConn.RandomID != 0 && activateConn.RandomID != 
con.RandomID {
+               log.Debugf("detect the connection: %d-%d is already closed(by 
difference random ID), so remove from the connection manager",
+                       con.ConnectionID, con.RandomID)
+               con.MarkDeletable = true
+               return false
+       }
+       return true
+}
+
 func (c *ConnectionManager) RemoveProcess(pid int32, entities 
[]api.ProcessInterface) {
        c.monitoringProcessLock.Lock()
        defer c.monitoringProcessLock.Unlock()
@@ -634,7 +663,7 @@ func (c *ConnectionManager) 
updateMonitorStatusForProcess(pid int32, monitor boo
 func (c *ConnectionManager) OnBuildConnectionLogFinished() {
        // delete all connections which marked as deletable
        // all deletable connection events been sent
-       deletableConnections := make(map[string]bool, 0)
+       deletableConnections := make(map[string]bool)
        now := time.Now()
        c.connections.IterCb(func(key string, v interface{}) {
                con, ok := v.(*ConnectionInfo)
@@ -643,6 +672,9 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
                }
                // already mark as deletable or process not monitoring
                shouldDelete := con.MarkDeletable || 
!c.ProcessIsMonitor(con.PID)
+               if !shouldDelete {
+                       shouldDelete = !c.checkConnectionIsExist(con)
+               }
 
                if shouldDelete && con.DeleteAfter == nil {
                        deleteAfterTime := now.Add(connectionDeleteDelayTime)
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index 240804e..95718bb 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -32,14 +32,11 @@ import (
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/accesslog/events"
        "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
+       "github.com/apache/skywalking-rover/pkg/accesslog/sender"
        "github.com/apache/skywalking-rover/pkg/core"
        "github.com/apache/skywalking-rover/pkg/core/backend"
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/module"
-       "github.com/apache/skywalking-rover/pkg/process"
-       "github.com/apache/skywalking-rover/pkg/tools/host"
-
-       v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
@@ -53,8 +50,8 @@ type Runner struct {
        mgr        *module.Manager
        backendOp  backend.Operator
        cluster    string
-       alsClient  v3.EBPFAccessLogServiceClient
        ctx        context.Context
+       sender     *sender.GRPCSender
 }
 
 func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) {
@@ -70,17 +67,18 @@ func NewRunner(mgr *module.Manager, config *common.Config) 
(*Runner, error) {
        backendOP := coreModule.BackendOperator()
        clusterName := coreModule.ClusterName()
        monitorFilter := 
common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","), 
strings.Split(config.ExcludeClusters, ","))
+       connectionMgr := common.NewConnectionManager(config, mgr, bpfLoader, 
monitorFilter)
        runner := &Runner{
                context: &common.AccessLogContext{
                        BPF:           bpfLoader,
                        Config:        config,
-                       ConnectionMgr: common.NewConnectionManager(config, mgr, 
bpfLoader, monitorFilter),
+                       ConnectionMgr: connectionMgr,
                },
                collectors: collector.Collectors(),
                mgr:        mgr,
                backendOp:  backendOP,
                cluster:    clusterName,
-               alsClient:  
v3.NewEBPFAccessLogServiceClient(backendOP.GetConnection()),
+               sender:     sender.NewGRPCSender(mgr, connectionMgr),
        }
        runner.context.Queue = common.NewQueue(config.Flush.MaxCountOneStream, 
flushDuration, runner)
        return runner, nil
@@ -91,6 +89,7 @@ func (r *Runner) Start(ctx context.Context) error {
        r.context.RuntimeContext = ctx
        r.context.Queue.Start(ctx)
        r.context.ConnectionMgr.Start(ctx, r.context)
+       r.sender.Start(ctx)
        for _, c := range r.collectors {
                err := c.Start(r.mgr, r.context)
                if err != nil {
@@ -111,26 +110,20 @@ func (r *Runner) Consume(kernels chan *common.KernelLog, 
protocols chan *common.
                return
        }
 
-       allLogs := r.buildConnectionLogs(kernels, protocols)
-       log.Debugf("ready to send access log, connection count: %d", 
len(allLogs))
-       if len(allLogs) == 0 {
-               return
-       }
-       if err := r.sendLogs(allLogs); err != nil {
-               log.Warnf("send access log failure: %v", err)
-       }
+       batch := r.sender.NewBatch()
+       r.buildConnectionLogs(batch, kernels, protocols)
+       log.Debugf("ready to send access log, connection count: %d", 
batch.ConnectionCount())
+       r.sender.AddBatch(batch)
 }
 
-func (r *Runner) buildConnectionLogs(kernels chan *common.KernelLog, protocols 
chan *common.ProtocolLog) map[*common.ConnectionInfo]*connectionLogs {
-       result := make(map[*common.ConnectionInfo]*connectionLogs)
-       r.buildKernelLogs(kernels, result)
-       r.buildProtocolLogs(protocols, result)
+func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan 
*common.KernelLog, protocols chan *common.ProtocolLog) {
+       r.buildKernelLogs(kernels, batch)
+       r.buildProtocolLogs(protocols, batch)
 
        r.context.ConnectionMgr.OnBuildConnectionLogFinished()
-       return result
 }
 
-func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, result 
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch 
*sender.BatchLogs) {
        delayAppends := make([]*common.KernelLog, 0)
        for {
                select {
@@ -139,13 +132,7 @@ func (r *Runner) buildKernelLogs(kernels chan 
*common.KernelLog, result map[*com
                        log.Debugf("building kernel log result, connetaion ID: 
%d, random ID: %d, exist connection: %t, delay: %t",
                                kernelLog.Event.GetConnectionID(), 
kernelLog.Event.GetRandomID(), connection != nil, delay)
                        if connection != nil && curLog != nil {
-                               logs, exist := result[connection]
-                               if !exist {
-                                       logs = newConnectionLogs()
-                                       result[connection] = logs
-                               }
-
-                               logs.kernels = append(logs.kernels, curLog)
+                               batch.AppendKernelLog(connection, curLog)
                        } else if delay {
                                delayAppends = append(delayAppends, kernelLog)
                        }
@@ -162,7 +149,7 @@ func (r *Runner) buildKernelLogs(kernels chan 
*common.KernelLog, result map[*com
        }
 }
 
-func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, result 
map[*common.ConnectionInfo]*connectionLogs) {
+func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch 
*sender.BatchLogs) {
        delayAppends := make([]*common.ProtocolLog, 0)
        for {
                select {
@@ -178,15 +165,7 @@ func (r *Runner) buildProtocolLogs(protocols chan 
*common.ProtocolLog, result ma
                                        conID, randomID, connection != nil, 
delay)
                        }
                        if connection != nil && len(kernelLogs) > 0 && 
protocolLogs != nil {
-                               logs, exist := result[connection]
-                               if !exist {
-                                       logs = newConnectionLogs()
-                                       result[connection] = logs
-                               }
-                               logs.protocols = append(logs.protocols, 
&connectionProtocolLog{
-                                       kernels:  kernelLogs,
-                                       protocol: protocolLogs,
-                               })
+                               batch.AppendProtocolLog(connection, 
protocolLogs)
                        } else if delay {
                                delayAppends = append(delayAppends, protocolLog)
                        }
@@ -203,95 +182,6 @@ func (r *Runner) buildProtocolLogs(protocols chan 
*common.ProtocolLog, result ma
        }
 }
 
-func (r *Runner) sendLogs(allLogs map[*common.ConnectionInfo]*connectionLogs) 
error {
-       timeout, cancelFunc := context.WithTimeout(r.ctx, time.Second*30)
-       defer cancelFunc()
-       streaming, err := r.alsClient.Collect(timeout)
-       if err != nil {
-               return err
-       }
-
-       firstLog := true
-       firstConnection := true
-       for connection, logs := range allLogs {
-               if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
-                       continue
-               }
-               if log.Enable(logrus.DebugLevel) {
-                       log.Debugf("ready to sending access log with 
connection, connection ID: %d, random ID: %d, "+
-                               "local: %s, remote: %s, role: %s, kernel logs 
count: %d, protocol log count: %d",
-                               connection.ConnectionID, connection.RandomID, 
connection.RPCConnection.Local, connection.RPCConnection.Remote,
-                               connection.RPCConnection.Role, 
len(logs.kernels), len(logs.protocols))
-               }
-
-               if len(logs.kernels) > 0 {
-                       r.sendLogToTheStream(streaming, 
r.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, 
nil))
-                       firstLog, firstConnection = false, false
-               }
-               for _, protocolLog := range logs.protocols {
-                       r.sendLogToTheStream(streaming, 
r.buildAccessLogMessage(firstLog, firstConnection, connection, 
protocolLog.kernels, protocolLog.protocol))
-                       firstLog, firstConnection = false, false
-               }
-
-               firstConnection = true
-       }
-
-       if _, err := streaming.CloseAndRecv(); err != nil {
-               log.Warnf("closing the access log streaming error: %v", err)
-       }
-       return nil
-}
-
-func (r *Runner) sendLogToTheStream(streaming 
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) {
-       if err := streaming.Send(logMsg); err != nil {
-               log.Warnf("send access log message failure: %v", err)
-       }
-}
-
-func (r *Runner) buildAccessLogMessage(firstLog, firstConnection bool, conn 
*common.ConnectionInfo,
-       kernelLogs []*v3.AccessLogKernelLog, protocolLog 
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
-       var rpcCon *v3.AccessLogConnection
-       if firstConnection {
-               rpcCon = conn.RPCConnection
-       }
-       return &v3.EBPFAccessLogMessage{
-               Node:        r.BuildNodeInfo(firstLog),
-               Connection:  rpcCon,
-               KernelLogs:  kernelLogs,
-               ProtocolLog: protocolLog,
-       }
-}
-
-func (r *Runner) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
-       if !needs {
-               return nil
-       }
-       netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
-       for i, n := range host.AllNetworkInterfaces() {
-               netInterfaces = append(netInterfaces, 
&v3.EBPFAccessLogNodeNetInterface{
-                       Index: int32(i),
-                       Mtu:   int32(n.MTU),
-                       Name:  n.Name,
-               })
-       }
-       return &v3.EBPFAccessLogNodeInfo{
-               Name:          
r.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
-               NetInterfaces: netInterfaces,
-               BootTime:      r.convertTimeToInstant(host.BootTime),
-               ClusterName:   r.cluster,
-               Policy: &v3.EBPFAccessLogPolicy{
-                       ExcludeNamespaces: 
r.context.ConnectionMgr.GetExcludeNamespaces(),
-               },
-       }
-}
-
-func (r *Runner) convertTimeToInstant(t time.Time) *v32.Instant {
-       return &v32.Instant{
-               Seconds: t.Unix(),
-               Nanos:   int32(t.Nanosecond()),
-       }
-}
-
 func (r *Runner) shouldReportProcessLog(pid uint32) bool {
        // if the process not monitoring, then check the process is existed or 
not
        if r.context.ConnectionMgr.ProcessIsMonitor(pid) {
@@ -364,20 +254,3 @@ func (r *Runner) Stop() error {
        r.context.ConnectionMgr.Stop()
        return nil
 }
-
-type connectionLogs struct {
-       kernels   []*v3.AccessLogKernelLog
-       protocols []*connectionProtocolLog
-}
-
-type connectionProtocolLog struct {
-       kernels  []*v3.AccessLogKernelLog
-       protocol *v3.AccessLogProtocolLogs
-}
-
-func newConnectionLogs() *connectionLogs {
-       return &connectionLogs{
-               kernels:   make([]*v3.AccessLogKernelLog, 0),
-               protocols: make([]*connectionProtocolLog, 0),
-       }
-}
diff --git a/pkg/accesslog/sender/logs.go b/pkg/accesslog/sender/logs.go
new file mode 100644
index 0000000..d381671
--- /dev/null
+++ b/pkg/accesslog/sender/logs.go
@@ -0,0 +1,106 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+       "github.com/apache/skywalking-rover/pkg/accesslog/common"
+
+       v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var maxLogsPerSend = 10_000
+
+type BatchLogs struct {
+       logs map[*common.ConnectionInfo]*ConnectionLogs
+}
+
+func newBatchLogs() *BatchLogs {
+       return &BatchLogs{
+               logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+       }
+}
+
+func (l *BatchLogs) ConnectionCount() int {
+       return len(l.logs)
+}
+
+func (l *BatchLogs) AppendKernelLog(connection *common.ConnectionInfo, log 
*v3.AccessLogKernelLog) {
+       logs, ok := l.logs[connection]
+       if !ok {
+               logs = newConnectionLogs()
+               l.logs[connection] = logs
+       }
+
+       logs.kernels = append(logs.kernels, log)
+}
+
+func (l *BatchLogs) AppendProtocolLog(connection *common.ConnectionInfo, log 
*v3.AccessLogProtocolLogs) {
+       logs, ok := l.logs[connection]
+       if !ok {
+               logs = newConnectionLogs()
+               l.logs[connection] = logs
+       }
+
+       logs.protocols = append(logs.protocols, &ConnectionProtocolLog{
+               protocol: log,
+       })
+}
+
+func (l *BatchLogs) splitBatchLogs() []*BatchLogs {
+       logsCount := len(l.logs)
+       if logsCount == 0 {
+               return nil
+       }
+       splitCount := logsCount / maxLogsPerSend
+       if logsCount%maxLogsPerSend != 0 {
+               splitCount++
+       }
+       result := make([]*BatchLogs, 0, splitCount)
+
+       // split the connections by maxLogsPerSend
+       currentCount := 0
+       var currentBatch *BatchLogs
+       for connection, logs := range l.logs {
+               if currentCount == 0 {
+                       currentBatch = newBatchLogs()
+                       result = append(result, currentBatch)
+                       currentCount = 0
+               }
+               currentBatch.logs[connection] = logs
+               currentCount++
+       }
+
+       return result
+}
+
+type ConnectionLogs struct {
+       kernels   []*v3.AccessLogKernelLog
+       protocols []*ConnectionProtocolLog
+}
+
+type ConnectionProtocolLog struct {
+       kernels  []*v3.AccessLogKernelLog
+       protocol *v3.AccessLogProtocolLogs
+}
+
+func newConnectionLogs() *ConnectionLogs {
+       return &ConnectionLogs{
+               kernels:   make([]*v3.AccessLogKernelLog, 0),
+               protocols: make([]*ConnectionProtocolLog, 0),
+       }
+}
diff --git a/pkg/accesslog/sender/sender.go b/pkg/accesslog/sender/sender.go
new file mode 100644
index 0000000..9cc5b2f
--- /dev/null
+++ b/pkg/accesslog/sender/sender.go
@@ -0,0 +1,231 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+       "container/list"
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-rover/pkg/accesslog/common"
+       "github.com/apache/skywalking-rover/pkg/core"
+       "github.com/apache/skywalking-rover/pkg/logger"
+       "github.com/apache/skywalking-rover/pkg/module"
+       "github.com/apache/skywalking-rover/pkg/process"
+       "github.com/apache/skywalking-rover/pkg/tools/host"
+
+       "github.com/sirupsen/logrus"
+
+       v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
+       v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
+)
+
+var log = logger.GetLogger("accesslog", "sender")
+
+// GRPCSender Async to sending the access log to the backend
+type GRPCSender struct {
+       logs   *list.List
+       notify chan bool
+       mutex  sync.Mutex
+       ctx    context.Context
+
+       mgr           *module.Manager
+       connectionMgr *common.ConnectionManager
+       alsClient     v3.EBPFAccessLogServiceClient
+       clusterName   string
+}
+
+// NewGRPCSender creates a new GRPCSender
+func NewGRPCSender(mgr *module.Manager, connectionMgr 
*common.ConnectionManager) *GRPCSender {
+       return &GRPCSender{
+               logs:          list.New(),
+               notify:        make(chan bool, 1),
+               mgr:           mgr,
+               connectionMgr: connectionMgr,
+               clusterName:   
mgr.FindModule(core.ModuleName).(core.Operator).ClusterName(),
+               alsClient: 
v3.NewEBPFAccessLogServiceClient(mgr.FindModule(core.ModuleName).(core.Operator).
+                       BackendOperator().GetConnection()),
+       }
+}
+
+func (g *GRPCSender) Start(ctx context.Context) {
+       g.ctx = ctx
+       go func() {
+               for {
+                       select {
+                       case <-g.notify:
+                               if count, err := g.handleLogs(); err != nil {
+                                       log.Warnf("sending access log error, 
lost %d logs, error: %v", count, err)
+                               }
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+}
+
+func (g *GRPCSender) NewBatch() *BatchLogs {
+       return &BatchLogs{
+               logs: make(map[*common.ConnectionInfo]*ConnectionLogs),
+       }
+}
+
+func (g *GRPCSender) AddBatch(batch *BatchLogs) {
+       // split logs
+       splitLogs := batch.splitBatchLogs()
+
+       // append the resend logs
+       g.mutex.Lock()
+       defer g.mutex.Unlock()
+       for _, l := range splitLogs {
+               g.logs.PushBack(l)
+       }
+
+       // notify the sender
+       select {
+       case g.notify <- true:
+       default:
+       }
+}
+
+func (g *GRPCSender) handleLogs() (int, error) {
+       for {
+               // pop logs
+               logs := g.popLogs()
+               if logs == nil {
+                       return 0, nil
+               }
+               // send logs
+               if err := g.sendLogs(logs); err != nil {
+                       return len(logs.logs), err
+               }
+       }
+}
+
+func (g *GRPCSender) sendLogs(batch *BatchLogs) error {
+       timeout, cancelFunc := context.WithTimeout(g.ctx, time.Second*20)
+       defer cancelFunc()
+       streaming, err := g.alsClient.Collect(timeout)
+       if err != nil {
+               return err
+       }
+
+       firstLog := true
+       firstConnection := true
+       var sendError error
+       for connection, logs := range batch.logs {
+               if len(logs.kernels) == 0 && len(logs.protocols) == 0 {
+                       continue
+               }
+               if log.Enable(logrus.DebugLevel) {
+                       log.Debugf("ready to sending access log with 
connection, connection ID: %d, random ID: %d, "+
+                               "local: %s, remote: %s, role: %s, contains 
ztunnel address: %t, kernel logs count: %d, protocol log count: %d",
+                               connection.ConnectionID, connection.RandomID, 
connection.RPCConnection.Local, connection.RPCConnection.Remote,
+                               connection.RPCConnection.Role, 
connection.RPCConnection.Attachment != nil, len(logs.kernels), 
len(logs.protocols))
+               }
+
+               if len(logs.kernels) > 0 {
+                       sendError = g.sendLogToTheStream(streaming, 
g.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, 
nil))
+                       firstLog, firstConnection = false, false
+               }
+               for _, protocolLog := range logs.protocols {
+                       sendError = g.sendLogToTheStream(streaming, 
g.buildAccessLogMessage(firstLog, firstConnection, connection, 
protocolLog.kernels, protocolLog.protocol))
+                       firstLog, firstConnection = false, false
+               }
+               if sendError != nil {
+                       g.closeStream(streaming)
+                       return fmt.Errorf("sending access log error: %v", 
sendError)
+               }
+
+               firstConnection = true
+       }
+
+       g.closeStream(streaming)
+       return nil
+}
+
+func (g *GRPCSender) closeStream(s v3.EBPFAccessLogService_CollectClient) {
+       if _, err := s.CloseAndRecv(); err != nil {
+               log.Warnf("closing the access log streaming error: %v", err)
+       }
+}
+
+func (g *GRPCSender) sendLogToTheStream(streaming 
v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) error {
+       if err := streaming.Send(logMsg); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (g *GRPCSender) buildAccessLogMessage(firstLog, firstConnection bool, 
conn *common.ConnectionInfo,
+       kernelLogs []*v3.AccessLogKernelLog, protocolLog 
*v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage {
+       var rpcCon *v3.AccessLogConnection
+       if firstConnection {
+               rpcCon = conn.RPCConnection
+       }
+       return &v3.EBPFAccessLogMessage{
+               Node:        g.BuildNodeInfo(firstLog),
+               Connection:  rpcCon,
+               KernelLogs:  kernelLogs,
+               ProtocolLog: protocolLog,
+       }
+}
+
+func (g *GRPCSender) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo {
+       if !needs {
+               return nil
+       }
+       netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0)
+       for i, n := range host.AllNetworkInterfaces() {
+               netInterfaces = append(netInterfaces, 
&v3.EBPFAccessLogNodeNetInterface{
+                       Index: int32(i),
+                       Mtu:   int32(n.MTU),
+                       Name:  n.Name,
+               })
+       }
+       return &v3.EBPFAccessLogNodeInfo{
+               Name:          
g.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(),
+               NetInterfaces: netInterfaces,
+               BootTime:      g.convertTimeToInstant(host.BootTime),
+               ClusterName:   g.clusterName,
+               Policy: &v3.EBPFAccessLogPolicy{
+                       ExcludeNamespaces: 
g.connectionMgr.GetExcludeNamespaces(),
+               },
+       }
+}
+
+func (g *GRPCSender) convertTimeToInstant(t time.Time) *v32.Instant {
+       return &v32.Instant{
+               Seconds: t.Unix(),
+               Nanos:   int32(t.Nanosecond()),
+       }
+}
+
+func (g *GRPCSender) popLogs() *BatchLogs {
+       g.mutex.Lock()
+       defer g.mutex.Unlock()
+       if g.logs.Len() == 0 {
+               return nil
+       }
+       e := g.logs.Front()
+       logs := e.Value.(*BatchLogs)
+       g.logs.Remove(e)
+       return logs
+}

Reply via email to