This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 608d1e6  Fix some connections not called close syscall, causing 
unnecessary memory usage (#136)
608d1e6 is described below

commit 608d1e6aebdafd5c7fa574a6b84f4bd01c048c7c
Author: mrproliu <[email protected]>
AuthorDate: Thu Aug 1 16:20:46 2024 +0800

    Fix some connections not called close syscall, causing unnecessary memory 
usage (#136)
---
 CHANGES.md                         |  1 +
 docker/Dockerfile.base             |  3 +-
 pkg/accesslog/common/connection.go | 72 +++++++++++++++++++++++++++++++++-----
 pkg/accesslog/forwarder/close.go   |  3 ++
 pkg/accesslog/runner.go            |  2 +-
 5 files changed, 71 insertions(+), 10 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3b22bba..d9fc284 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ Release Notes.
 * Fix the profiling cannot found process issue.
 * Fix cannot translate peer address in some UDP scenarios.
 * Fix the protocol logs may be missing if the process is short-lived.
+* Fix some connections not called close syscall, causing unnecessary memory 
usage.
 
 #### Documentation
 
diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base
index f4bc05e..0d57cf1 100644
--- a/docker/Dockerfile.base
+++ b/docker/Dockerfile.base
@@ -26,6 +26,7 @@ RUN apt update && \
     cd bpftool && make -C src install && cp $(which bpftool) /usr/sbin/bpftool 
&& \
     wget https://apt.llvm.org/llvm.sh && \
     chmod +x llvm.sh && \
-    ./llvm.sh 18
+    ./llvm.sh 18 && \
+    apt install -y llvm-18
 
 ENV PATH="${PATH}:/usr/lib/llvm-18/bin"
\ No newline at end of file
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 72fbce4..71d1056 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -18,6 +18,7 @@
 package common
 
 import (
+       "context"
        "errors"
        "fmt"
        "strings"
@@ -34,7 +35,9 @@ import (
        "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
        "github.com/apache/skywalking-rover/pkg/tools"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
+       "github.com/apache/skywalking-rover/pkg/tools/host"
        "github.com/apache/skywalking-rover/pkg/tools/ip"
+       "github.com/apache/skywalking-rover/pkg/tools/path"
 
        "github.com/cilium/ebpf"
 
@@ -46,8 +49,13 @@ import (
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
-// only using to match the remote IP address
-const localAddressPairCacheTime = time.Second * 15
+const (
+       // only using to match the remote IP address
+       localAddressPairCacheTime = time.Second * 15
+
+       // clean the active connection in BPF interval
+       cleanActiveConnectionInterval = time.Second * 20
+)
 
 type addressProcessType int
 
@@ -157,8 +165,56 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
        return mgr
 }
 
-func (c *ConnectionManager) Start() {
+func (c *ConnectionManager) Start(ctx context.Context, accessLogContext 
*AccessLogContext) {
        c.processOP.AddListener(c)
+
+       // starting to clean up the un-active connection in BPF
+       go func() {
+               ticker := time.NewTicker(cleanActiveConnectionInterval)
+               for {
+                       select {
+                       case <-ticker.C:
+                               activeConnections := 
c.activeConnectionMap.Iterate()
+                               var conID uint64
+                               var activateConn ActiveConnection
+                               for activeConnections.Next(&conID, 
&activateConn) {
+                                       // if the connection is existed, then 
check the next one
+                                       pid, fd := 
events.ParseConnectionID(conID)
+                                       if c.checkProcessFDExist(pid, fd) {
+                                               continue
+                                       }
+
+                                       // if the connection is not existed, 
then delete it
+                                       if err := 
c.activeConnectionMap.Delete(conID); err != nil {
+                                               log.Warnf("failed to delete the 
active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: 
%v",
+                                                       pid, fd, conID, 
activateConn.RandomID, err)
+                                               continue
+                                       }
+                                       log.Debugf("deleted the active 
connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, 
random ID: %d",
+                                               pid, fd, conID, 
activateConn.RandomID)
+
+                                       // building and send the close event
+                                       wapperedEvent := 
c.OnConnectionClose(&events.SocketCloseEvent{
+                                               ConnectionID: conID,
+                                               RandomID:     
activateConn.RandomID,
+                                               StartTime:    0,
+                                               EndTime:      0,
+                                               PID:          activateConn.PID,
+                                               SocketFD:     
activateConn.SocketFD,
+                                               Success:      0,
+                                       })
+                                       
accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent)
+                               }
+
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+}
+
+func (c *ConnectionManager) checkProcessFDExist(pid, fd uint32) bool {
+       return path.Exists(host.GetFileInHost(fmt.Sprintf("/proc/%d/fd/%d", 
pid, fd)))
 }
 
 func (c *ConnectionManager) Stop() {
@@ -358,20 +414,20 @@ func (c *ConnectionManager) OnConnectionClose(event 
*events.SocketCloseEvent) *C
        return result
 }
 
-func (c *ConnectionManager) savingTheAddress(host string, port uint16, 
localPid bool, pid uint32) {
+func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, 
localPid bool, pid uint32) {
        localAddrInfo := &addressInfo{
                pid: pid,
        }
-       c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", host, port, localPid), 
localAddrInfo, localAddressPairCacheTime)
+       c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port, 
localPid), localAddrInfo, localAddressPairCacheTime)
        localStr := strRemote
        if localPid {
                localStr = strLocal
        }
-       log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: 
%d", localStr, host, port, pid)
+       log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: 
%d", localStr, hostAddress, port, pid)
 }
 
-func (c *ConnectionManager) getAddressPid(host string, port uint16, localPid 
bool) *addressInfo {
-       addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", host, 
port, localPid))
+func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16, 
localPid bool) *addressInfo {
+       addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", 
hostAddress, port, localPid))
        if ok && addrInfo != nil {
                return addrInfo.(*addressInfo)
        }
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go
index 239c18d..5afbec3 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/forwarder/close.go
@@ -34,6 +34,9 @@ func SendCloseEvent(context *common.AccessLogContext, event 
*common.CloseEventWi
 
 func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
        closeEvent := event.(*common.CloseEventWithNotify)
+       if closeEvent.StartTime == 0 {
+               return nil
+       }
        closeOp := &v3.AccessLogKernelCloseOperation{}
        closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime)
        closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime)
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index 8164e81..e79a81b 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -88,7 +88,7 @@ func (r *Runner) Start(ctx context.Context) error {
        r.ctx = ctx
        r.context.RuntimeContext = ctx
        r.context.Queue.Start(ctx)
-       r.context.ConnectionMgr.Start()
+       r.context.ConnectionMgr.Start(ctx, r.context)
        for _, c := range r.collectors {
                err := c.Start(r.mgr, r.context)
                if err != nil {

Reply via email to