This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 127c9a8 cache ip more time
127c9a8 is described below
commit 127c9a8f01aaa194487355863578fb79fb3c1096
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 18:03:02 2024 +0800
cache ip more time
---
pkg/accesslog/collector/protocols/queue.go | 17 --------
pkg/accesslog/common/connection.go | 64 +++++++-----------------------
pkg/tools/ip.go | 24 ++++++-----
3 files changed, 29 insertions(+), 76 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index a4577f7..c3ccd4c 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -166,20 +166,6 @@ func NewPartitionContext(ctx *common.AccessLogContext, num
int, protocols []Prot
return pc
}
-func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent,
closeCallback common.ConnectionProcessFinishCallback) {
- conn, exist :=
p.connections.Get(p.buildConnectionKey(event.GetConnectionID(),
event.GetRandomID()))
- if !exist {
- log.Debugf("connection is not exist in the partion context,
connection ID: %d, random ID: %d, partition number: %d",
- event.GetConnectionID(), event.GetRandomID(),
p.partitionNum)
- closeCallback()
- return
- }
- connection := conn.(*PartitionConnection)
- connection.closeCallback = closeCallback
- log.Debugf("receive the connection close event and mark is closable,
connection ID: %d, random ID: %d, partition number: %d",
- event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
-}
-
func (p *PartitionContext) Start(ctx context.Context) {
// process events with interval
flushDuration, _ := time.ParseDuration(p.context.Config.Flush.Period)
@@ -284,9 +270,6 @@ func (p *PartitionContext) processEvents() {
p.checkTheConnectionIsAlreadyClose(info)
}
if info.closed {
- if info.closeCallback != nil {
- info.closeCallback()
- }
closedConnections = append(closedConnections, conKey)
log.Debugf("detect the connection is already closed,
then notify to the callback, connection ID: %d, random ID: %d, partition
number: %d",
info.connectionID, info.randomID,
p.partitionNum)
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 9b8bd1a..cbcb925 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -82,13 +82,11 @@ type ConnectEventWithSocket struct {
type CloseEventWithNotify struct {
*events.SocketCloseEvent
- allProcessorFinished bool
}
type ConnectionProcessFinishCallback func()
type ConnectionProcessor interface {
- OnConnectionClose(event *events.SocketCloseEvent, callback
ConnectionProcessFinishCallback)
}
type FlusherListener interface {
@@ -165,19 +163,18 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
log.Warnf("cannot create the connection tracker, %v", err)
}
mgr := &ConnectionManager{
- moduleMgr: moduleMgr,
- processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
- connections: cmap.New(),
- addressWithPid: cache.NewExpiring(),
- localPortWithPid: cache.NewExpiring(),
- localIPWithPid: make(map[string]int32),
- monitoringProcesses:
make(map[int32][]api.ProcessInterface),
- processMonitorMap: bpfLoader.ProcessMonitorControl,
- activeConnectionMap: bpfLoader.ActiveConnectionMap,
- allUnfinishedConnections: make(map[string]*bool),
- monitorFilter: filter,
- flushListeners: make([]FlusherListener, 0),
- ConnectTracker: track,
+ moduleMgr: moduleMgr,
+ processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
+ connections: cmap.New(),
+ addressWithPid: cache.NewExpiring(),
+ localPortWithPid: cache.NewExpiring(),
+ localIPWithPid: make(map[string]int32),
+ monitoringProcesses: make(map[int32][]api.ProcessInterface),
+ processMonitorMap: bpfLoader.ProcessMonitorControl,
+ activeConnectionMap: bpfLoader.ActiveConnectionMap,
+ monitorFilter: filter,
+ flushListeners: make([]FlusherListener, 0),
+ ConnectTracker: track,
}
return mgr
}
@@ -337,12 +334,7 @@ func (c *ConnectionManager)
connectionPostHandle(connection *ConnectionInfo, eve
}
switch e := event.(type) {
case *CloseEventWithNotify:
- if e.allProcessorFinished {
- connection.MarkDeletable = true
- } else {
- // if not all processor finished, then add into the map
- c.allUnfinishedConnections[fmt.Sprintf("%d_%d",
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
- }
+ connection.MarkDeletable = true
case events.SocketDetail:
tlsMode := connection.RPCConnection.TlsMode
protocol := connection.RPCConnection.Protocol
@@ -467,21 +459,9 @@ func (c *ConnectionManager) buildAddressFromRemote(ipHost
string, port uint16) *
}
func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent)
*CloseEventWithNotify {
- result := &CloseEventWithNotify{
- SocketCloseEvent: event,
- allProcessorFinished: false,
- }
- processCount := len(c.processors)
- for _, l := range c.processors {
- l.OnConnectionClose(event, func() {
- processCount--
- if processCount > 0 {
- return
- }
- result.allProcessorFinished = true
- })
+ return &CloseEventWithNotify{
+ SocketCloseEvent: event,
}
- return result
}
func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16,
localPid bool, pid uint32) {
@@ -737,20 +717,6 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished()
{
}
})
- deleteFromUnfinished := make([]string, 0)
- for conKey, processorFinished := range c.allUnfinishedConnections {
- if *processorFinished {
- deletableConnections[conKey] = true
- deleteFromUnfinished = append(deleteFromUnfinished,
conKey)
- } else {
- // if the processor not finished, then ignore it from
deletable connections
- delete(deletableConnections, conKey)
- }
- }
- for _, key := range deleteFromUnfinished {
- delete(c.allUnfinishedConnections, key)
- }
-
for key := range deletableConnections {
log.Debugf("deleting the connection in manager: %s", key)
c.connections.Remove(key)
diff --git a/pkg/tools/ip.go b/pkg/tools/ip.go
index 24305b9..b3a34fa 100644
--- a/pkg/tools/ip.go
+++ b/pkg/tools/ip.go
@@ -38,7 +38,7 @@ func DefaultHostIPAddress() string {
// HostIPAddressV4 found the IPV4 address from appoint net interface name
func HostIPAddressV4(name string) string {
- address := host.ipAddresses[name]
+ address := host.ipAddressesByName[name]
if address == nil {
return ""
}
@@ -47,7 +47,7 @@ func HostIPAddressV4(name string) string {
// HostIPAddressV6 found the IPV6 address from appoint net interface name
func HostIPAddressV6(name string) string {
- address := host.ipAddresses[name]
+ address := host.ipAddressesByName[name]
if address == nil {
return ""
}
@@ -56,10 +56,8 @@ func HostIPAddressV6(name string) string {
// IsLocalHostAddress is the address from local
func IsLocalHostAddress(address string) bool {
- for _, h := range host.ipAddresses {
- if h.ipV4 == address || h.ipV6 == address {
- return true
- }
+ if host.ipAddressesByIP[address] {
+ return true
}
return address == "0.0.0.0"
}
@@ -73,8 +71,9 @@ type hostInfo struct {
// hostname
name string
// ip address
- ipAddresses map[string]*hostIPAddress
- defaultIPAddr string
+ ipAddressesByName map[string]*hostIPAddress
+ ipAddressesByIP map[string]bool
+ defaultIPAddr string
}
type hostIPAddress struct {
@@ -83,7 +82,7 @@ type hostIPAddress struct {
}
func queryHostInfo() *hostInfo {
- addresses, def, err := localIPAddress0()
+ addressesByName, def, err := localIPAddress0()
if err != nil {
panic(err)
}
@@ -91,7 +90,12 @@ func queryHostInfo() *hostInfo {
if err != nil {
panic(err)
}
- return &hostInfo{name: name, ipAddresses: addresses, defaultIPAddr: def}
+ addressesByIP := make(map[string]bool)
+ for _, addr := range addressesByName {
+ addressesByIP[addr.ipV4] = true
+ addressesByIP[addr.ipV6] = true
+ }
+ return &hostInfo{name: name, ipAddressesByName: addressesByName,
ipAddressesByIP: addressesByIP, defaultIPAddr: def}
}
func hostname0() (string, error) {