This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 608d1e6 Fix some connections not called close syscall, causing
unnecessary memory usage (#136)
608d1e6 is described below
commit 608d1e6aebdafd5c7fa574a6b84f4bd01c048c7c
Author: mrproliu <[email protected]>
AuthorDate: Thu Aug 1 16:20:46 2024 +0800
Fix some connections not called close syscall, causing unnecessary memory
usage (#136)
---
CHANGES.md | 1 +
docker/Dockerfile.base | 3 +-
pkg/accesslog/common/connection.go | 72 +++++++++++++++++++++++++++++++++-----
pkg/accesslog/forwarder/close.go | 3 ++
pkg/accesslog/runner.go | 2 +-
5 files changed, 71 insertions(+), 10 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3b22bba..d9fc284 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ Release Notes.
* Fix the profiling cannot found process issue.
* Fix cannot translate peer address in some UDP scenarios.
* Fix the protocol logs may be missing if the process is short-lived.
+* Fix some connections not called close syscall, causing unnecessary memory
usage.
#### Documentation
diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base
index f4bc05e..0d57cf1 100644
--- a/docker/Dockerfile.base
+++ b/docker/Dockerfile.base
@@ -26,6 +26,7 @@ RUN apt update && \
cd bpftool && make -C src install && cp $(which bpftool) /usr/sbin/bpftool
&& \
wget https://apt.llvm.org/llvm.sh && \
chmod +x llvm.sh && \
- ./llvm.sh 18
+ ./llvm.sh 18 && \
+ apt install -y llvm-18
ENV PATH="${PATH}:/usr/lib/llvm-18/bin"
\ No newline at end of file
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 72fbce4..71d1056 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -18,6 +18,7 @@
package common
import (
+ "context"
"errors"
"fmt"
"strings"
@@ -34,7 +35,9 @@ import (
"github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/enums"
+ "github.com/apache/skywalking-rover/pkg/tools/host"
"github.com/apache/skywalking-rover/pkg/tools/ip"
+ "github.com/apache/skywalking-rover/pkg/tools/path"
"github.com/cilium/ebpf"
@@ -46,8 +49,13 @@ import (
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
-// only using to match the remote IP address
-const localAddressPairCacheTime = time.Second * 15
+const (
+ // only using to match the remote IP address
+ localAddressPairCacheTime = time.Second * 15
+
+ // clean the active connection in BPF interval
+ cleanActiveConnectionInterval = time.Second * 20
+)
type addressProcessType int
@@ -157,8 +165,56 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
return mgr
}
-func (c *ConnectionManager) Start() {
+func (c *ConnectionManager) Start(ctx context.Context, accessLogContext
*AccessLogContext) {
c.processOP.AddListener(c)
+
+ // starting to clean up the un-active connection in BPF
+ go func() {
+ ticker := time.NewTicker(cleanActiveConnectionInterval)
+ for {
+ select {
+ case <-ticker.C:
+ activeConnections :=
c.activeConnectionMap.Iterate()
+ var conID uint64
+ var activateConn ActiveConnection
+ for activeConnections.Next(&conID,
&activateConn) {
+ // if the connection is existed, then
check the next one
+ pid, fd :=
events.ParseConnectionID(conID)
+ if c.checkProcessFDExist(pid, fd) {
+ continue
+ }
+
+ // if the connection is not existed,
then delete it
+ if err :=
c.activeConnectionMap.Delete(conID); err != nil {
+ log.Warnf("failed to delete the
active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error:
%v",
+ pid, fd, conID,
activateConn.RandomID, err)
+ continue
+ }
+ log.Debugf("deleted the active
connection as not exist in file system, pid: %d, fd: %d, connection ID: %d,
random ID: %d",
+ pid, fd, conID,
activateConn.RandomID)
+
+ // building and send the close event
+ wapperedEvent :=
c.OnConnectionClose(&events.SocketCloseEvent{
+ ConnectionID: conID,
+ RandomID:
activateConn.RandomID,
+ StartTime: 0,
+ EndTime: 0,
+ PID: activateConn.PID,
+ SocketFD:
activateConn.SocketFD,
+ Success: 0,
+ })
+
accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent)
+ }
+
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+func (c *ConnectionManager) checkProcessFDExist(pid, fd uint32) bool {
+ return path.Exists(host.GetFileInHost(fmt.Sprintf("/proc/%d/fd/%d",
pid, fd)))
}
func (c *ConnectionManager) Stop() {
@@ -358,20 +414,20 @@ func (c *ConnectionManager) OnConnectionClose(event
*events.SocketCloseEvent) *C
return result
}
-func (c *ConnectionManager) savingTheAddress(host string, port uint16,
localPid bool, pid uint32) {
+func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16,
localPid bool, pid uint32) {
localAddrInfo := &addressInfo{
pid: pid,
}
- c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", host, port, localPid),
localAddrInfo, localAddressPairCacheTime)
+ c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port,
localPid), localAddrInfo, localAddressPairCacheTime)
localStr := strRemote
if localPid {
localStr = strLocal
}
- log.Debugf("saving the %s address with pid cache, address: %s:%d, pid:
%d", localStr, host, port, pid)
+ log.Debugf("saving the %s address with pid cache, address: %s:%d, pid:
%d", localStr, hostAddress, port, pid)
}
-func (c *ConnectionManager) getAddressPid(host string, port uint16, localPid
bool) *addressInfo {
- addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", host,
port, localPid))
+func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16,
localPid bool) *addressInfo {
+ addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t",
hostAddress, port, localPid))
if ok && addrInfo != nil {
return addrInfo.(*addressInfo)
}
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go
index 239c18d..5afbec3 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/forwarder/close.go
@@ -34,6 +34,9 @@ func SendCloseEvent(context *common.AccessLogContext, event
*common.CloseEventWi
func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
closeEvent := event.(*common.CloseEventWithNotify)
+ if closeEvent.StartTime == 0 {
+ return nil
+ }
closeOp := &v3.AccessLogKernelCloseOperation{}
closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime)
closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime)
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index 8164e81..e79a81b 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -88,7 +88,7 @@ func (r *Runner) Start(ctx context.Context) error {
r.ctx = ctx
r.context.RuntimeContext = ctx
r.context.Queue.Start(ctx)
- r.context.ConnectionMgr.Start()
+ r.context.ConnectionMgr.Start(ctx, r.context)
for _, c := range r.collectors {
err := c.Start(r.mgr, r.context)
if err != nil {