This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 6803cef remove test code
6803cef is described below
commit 6803cefa6daa994457c6e5b52f016cdd30b89dfb
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 30 12:26:59 2024 +0800
remove test code
---
bpf/accesslog/syscalls/connect.c | 3 +--
pkg/accesslog/collector/connection.go | 22 +++++++++-------------
pkg/accesslog/common/connection.go | 6 +++---
pkg/accesslog/events/close.go | 2 +-
pkg/accesslog/events/detail.go | 2 +-
.../task/network/analyze/layer7/events.go | 1 +
pkg/tools/buffer/buffer.go | 3 +++
pkg/tools/ip/conntrack.go | 4 +---
8 files changed, 20 insertions(+), 23 deletions(-)
diff --git a/bpf/accesslog/syscalls/connect.c b/bpf/accesslog/syscalls/connect.c
index 406829a..061a4ee 100644
--- a/bpf/accesslog/syscalls/connect.c
+++ b/bpf/accesslog/syscalls/connect.c
@@ -32,8 +32,7 @@ static __inline void process_connect(void *ctx, __u64 id,
struct connect_args_t
__u32 tgid = id >> 32;
struct sock *sock = connect_args->sock;
struct socket *s = _(sock->sk_socket);
- submit_new_connection(ctx, success, SOCKET_OPTS_TYPE_CONNECT, tgid,
connect_args->fd, connect_args->start_nacs,
- connect_args->addr, s, &connect_args->remote, 0);
+ submit_new_connection(ctx, success, SOCKET_OPTS_TYPE_CONNECT, tgid,
connect_args->fd, connect_args->start_nacs, connect_args->addr, s,
&connect_args->remote, 0);
}
static __inline void process_accept(void *ctx, __u64 id, struct accept_args_t
*accept_args, long ret) {
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index 7550a0d..f0061e3 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -150,7 +150,14 @@ func (c *ConnectionPartitionContext) Consume(data
interface{}) {
event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
socketPair := c.BuildSocketFromConnectEvent(event)
- c.OnConnectionSocketFinished(event, socketPair)
+ if socketPair == nil {
+ connectionLogger.Debugf("cannot found the socket paire
from connect event, connection ID: %d, randomID: %d",
+ event.ConID, event.RandomID)
+ return
+ }
+ connectionLogger.Debugf("build socket pair success, connection
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
+ event.ConID, event.RandomID, socketPair.Role,
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
+ forwarder.SendConnectEvent(c.context, event, socketPair)
case *events.SocketCloseEvent:
connectionLogger.Debugf("receive close event, connection ID:
%d, randomID: %d, pid: %d, fd: %d",
event.ConnectionID, event.RandomID, event.PID,
event.SocketFD)
@@ -179,17 +186,6 @@ func (c *ConnectionPartitionContext)
FixSocketFamilyIfNeed(event *events.SocketC
}
}
-func (c *ConnectionPartitionContext) OnConnectionSocketFinished(event
*events.SocketConnectEvent, socketPair *ip.SocketPair) {
- if socketPair == nil {
- connectionLogger.Debugf("cannot found the socket paire from
connect event, connection ID: %d, randomID: %d",
- event.ConID, event.RandomID)
- return
- }
- connectionLogger.Debugf("build socket pair success, connection ID: %d,
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
- event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP,
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
- forwarder.SendConnectEvent(c.context, event, socketPair)
-}
-
func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event
*events.SocketConnectEvent) *ip.SocketPair {
if event.SocketFamily != unix.AF_INET && event.SocketFamily !=
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
// if not ipv4, ipv6 or unknown, ignore
@@ -211,7 +207,7 @@ func (c *ConnectionPartitionContext)
BuildSocketFromConnectEvent(event *events.S
pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
- log.Debugf("cannot found the socket, pid: %d, socket FD: %d,
error: %v", event.PID, event.SocketFD, err)
+ connectionLogger.Debugf("cannot found the socket, pid: %d,
socket FD: %d, error: %v", event.PID, event.SocketFD, err)
return nil
}
connectionLogger.Debugf("found the connection from the socket,
connection ID: %d, randomID: %d",
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 12e5a5c..33179e7 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -105,7 +105,7 @@ type ConnectionManager struct {
flushListeners []FlusherListener
- ConnectTracker *ip.ConnTrack
+ connectTracker *ip.ConnTrack
}
func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -150,7 +150,7 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
activeConnectionMap: bpfLoader.ActiveConnectionMap,
monitorFilter: filter,
flushListeners: make([]FlusherListener, 0),
- ConnectTracker: track,
+ connectTracker: track,
}
return mgr
}
@@ -263,7 +263,7 @@ func (c *ConnectionManager) buildRemoteAddress(e
*events.SocketConnectEvent, soc
// if the remote connection is need to use conntrack, then update the
real peer address
if socket.NeedConnTrack {
- if err := c.ConnectTracker.UpdateRealPeerAddress(socket); err
!= nil {
+ if err := c.connectTracker.UpdateRealPeerAddress(socket); err
!= nil {
log.Debugf("cannot update the real peer address, %v",
err)
}
}
diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go
index c9cec93..d4aaa1b 100644
--- a/pkg/accesslog/events/close.go
+++ b/pkg/accesslog/events/close.go
@@ -18,9 +18,9 @@
package events
import (
- "github.com/apache/skywalking-rover/pkg/tools/btf"
"time"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index 4716f1f..5beb5df 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -18,9 +18,9 @@
package events
import (
- "github.com/apache/skywalking-rover/pkg/tools/btf"
"time"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 1844430..359e466 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -19,6 +19,7 @@ package layer7
import (
"context"
+
"github.com/apache/skywalking-rover/pkg/tools/buffer"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 7054420..33f6d1c 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -812,6 +812,9 @@ func (r *Buffer) DeleteExpireEvents(expireDuration
time.Duration) int {
expireTime := time.Now().Add(-expireDuration)
// data event queue
count := r.deleteEventsWithJudgement(r.dataEvents, func(element
*list.Element) bool {
+ if element.Value == nil {
+ return true
+ }
buffer := element.Value.(SocketDataBuffer)
startTime := host.Time(buffer.StartTime())
if expireTime.After(startTime) {
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 9c2bf37..3e3f934 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -29,9 +29,7 @@ import (
"github.com/apache/skywalking-rover/pkg/logger"
)
-var (
- log = logger.GetLogger("tools", "ip")
-)
+var log = logger.GetLogger("tools", "ip")
var numberStrategies = []struct {
name string