This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 127c9a8  cache ip more time
127c9a8 is described below

commit 127c9a8f01aaa194487355863578fb79fb3c1096
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 18:03:02 2024 +0800

    cache ip more time
---
 pkg/accesslog/collector/protocols/queue.go | 17 --------
 pkg/accesslog/common/connection.go         | 64 +++++++-----------------------
 pkg/tools/ip.go                            | 24 ++++++-----
 3 files changed, 29 insertions(+), 76 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index a4577f7..c3ccd4c 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -166,20 +166,6 @@ func NewPartitionContext(ctx *common.AccessLogContext, num 
int, protocols []Prot
        return pc
 }
 
-func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent, 
closeCallback common.ConnectionProcessFinishCallback) {
-       conn, exist := 
p.connections.Get(p.buildConnectionKey(event.GetConnectionID(), 
event.GetRandomID()))
-       if !exist {
-               log.Debugf("connection is not exist in the partion context, 
connection ID: %d, random ID: %d, partition number: %d",
-                       event.GetConnectionID(), event.GetRandomID(), 
p.partitionNum)
-               closeCallback()
-               return
-       }
-       connection := conn.(*PartitionConnection)
-       connection.closeCallback = closeCallback
-       log.Debugf("receive the connection close event and mark is closable, 
connection ID: %d, random ID: %d, partition number: %d",
-               event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
-}
-
 func (p *PartitionContext) Start(ctx context.Context) {
        // process events with interval
        flushDuration, _ := time.ParseDuration(p.context.Config.Flush.Period)
@@ -284,9 +270,6 @@ func (p *PartitionContext) processEvents() {
                        p.checkTheConnectionIsAlreadyClose(info)
                }
                if info.closed {
-                       if info.closeCallback != nil {
-                               info.closeCallback()
-                       }
                        closedConnections = append(closedConnections, conKey)
                        log.Debugf("detect the connection is already closed, 
then notify to the callback, connection ID: %d, random ID: %d, partition 
number: %d",
                                info.connectionID, info.randomID, 
p.partitionNum)
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 9b8bd1a..cbcb925 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -82,13 +82,11 @@ type ConnectEventWithSocket struct {
 
 type CloseEventWithNotify struct {
        *events.SocketCloseEvent
-       allProcessorFinished bool
 }
 
 type ConnectionProcessFinishCallback func()
 
 type ConnectionProcessor interface {
-       OnConnectionClose(event *events.SocketCloseEvent, callback 
ConnectionProcessFinishCallback)
 }
 
 type FlusherListener interface {
@@ -165,19 +163,18 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
                log.Warnf("cannot create the connection tracker, %v", err)
        }
        mgr := &ConnectionManager{
-               moduleMgr:                moduleMgr,
-               processOP:                
moduleMgr.FindModule(process.ModuleName).(process.Operator),
-               connections:              cmap.New(),
-               addressWithPid:           cache.NewExpiring(),
-               localPortWithPid:         cache.NewExpiring(),
-               localIPWithPid:           make(map[string]int32),
-               monitoringProcesses:      
make(map[int32][]api.ProcessInterface),
-               processMonitorMap:        bpfLoader.ProcessMonitorControl,
-               activeConnectionMap:      bpfLoader.ActiveConnectionMap,
-               allUnfinishedConnections: make(map[string]*bool),
-               monitorFilter:            filter,
-               flushListeners:           make([]FlusherListener, 0),
-               ConnectTracker:           track,
+               moduleMgr:           moduleMgr,
+               processOP:           
moduleMgr.FindModule(process.ModuleName).(process.Operator),
+               connections:         cmap.New(),
+               addressWithPid:      cache.NewExpiring(),
+               localPortWithPid:    cache.NewExpiring(),
+               localIPWithPid:      make(map[string]int32),
+               monitoringProcesses: make(map[int32][]api.ProcessInterface),
+               processMonitorMap:   bpfLoader.ProcessMonitorControl,
+               activeConnectionMap: bpfLoader.ActiveConnectionMap,
+               monitorFilter:       filter,
+               flushListeners:      make([]FlusherListener, 0),
+               ConnectTracker:      track,
        }
        return mgr
 }
@@ -337,12 +334,7 @@ func (c *ConnectionManager) 
connectionPostHandle(connection *ConnectionInfo, eve
        }
        switch e := event.(type) {
        case *CloseEventWithNotify:
-               if e.allProcessorFinished {
-                       connection.MarkDeletable = true
-               } else {
-                       // if not all processor finished, then add into the map
-                       c.allUnfinishedConnections[fmt.Sprintf("%d_%d", 
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
-               }
+               connection.MarkDeletable = true
        case events.SocketDetail:
                tlsMode := connection.RPCConnection.TlsMode
                protocol := connection.RPCConnection.Protocol
@@ -467,21 +459,9 @@ func (c *ConnectionManager) buildAddressFromRemote(ipHost 
string, port uint16) *
 }
 
 func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent) 
*CloseEventWithNotify {
-       result := &CloseEventWithNotify{
-               SocketCloseEvent:     event,
-               allProcessorFinished: false,
-       }
-       processCount := len(c.processors)
-       for _, l := range c.processors {
-               l.OnConnectionClose(event, func() {
-                       processCount--
-                       if processCount > 0 {
-                               return
-                       }
-                       result.allProcessorFinished = true
-               })
+       return &CloseEventWithNotify{
+               SocketCloseEvent: event,
        }
-       return result
 }
 
 func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, 
localPid bool, pid uint32) {
@@ -737,20 +717,6 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() 
{
                }
        })
 
-       deleteFromUnfinished := make([]string, 0)
-       for conKey, processorFinished := range c.allUnfinishedConnections {
-               if *processorFinished {
-                       deletableConnections[conKey] = true
-                       deleteFromUnfinished = append(deleteFromUnfinished, 
conKey)
-               } else {
-                       // if the processor not finished, then ignore it from 
deletable connections
-                       delete(deletableConnections, conKey)
-               }
-       }
-       for _, key := range deleteFromUnfinished {
-               delete(c.allUnfinishedConnections, key)
-       }
-
        for key := range deletableConnections {
                log.Debugf("deleting the connection in manager: %s", key)
                c.connections.Remove(key)
diff --git a/pkg/tools/ip.go b/pkg/tools/ip.go
index 24305b9..b3a34fa 100644
--- a/pkg/tools/ip.go
+++ b/pkg/tools/ip.go
@@ -38,7 +38,7 @@ func DefaultHostIPAddress() string {
 
 // HostIPAddressV4 found the IPV4 address from appoint net interface name
 func HostIPAddressV4(name string) string {
-       address := host.ipAddresses[name]
+       address := host.ipAddressesByName[name]
        if address == nil {
                return ""
        }
@@ -47,7 +47,7 @@ func HostIPAddressV4(name string) string {
 
 // HostIPAddressV6 found the IPV6 address from appoint net interface name
 func HostIPAddressV6(name string) string {
-       address := host.ipAddresses[name]
+       address := host.ipAddressesByName[name]
        if address == nil {
                return ""
        }
@@ -56,10 +56,8 @@ func HostIPAddressV6(name string) string {
 
 // IsLocalHostAddress is the address from local
 func IsLocalHostAddress(address string) bool {
-       for _, h := range host.ipAddresses {
-               if h.ipV4 == address || h.ipV6 == address {
-                       return true
-               }
+       if host.ipAddressesByIP[address] {
+               return true
        }
        return address == "0.0.0.0"
 }
@@ -73,8 +71,9 @@ type hostInfo struct {
        // hostname
        name string
        // ip address
-       ipAddresses   map[string]*hostIPAddress
-       defaultIPAddr string
+       ipAddressesByName map[string]*hostIPAddress
+       ipAddressesByIP   map[string]bool
+       defaultIPAddr     string
 }
 
 type hostIPAddress struct {
@@ -83,7 +82,7 @@ type hostIPAddress struct {
 }
 
 func queryHostInfo() *hostInfo {
-       addresses, def, err := localIPAddress0()
+       addressesByName, def, err := localIPAddress0()
        if err != nil {
                panic(err)
        }
@@ -91,7 +90,12 @@ func queryHostInfo() *hostInfo {
        if err != nil {
                panic(err)
        }
-       return &hostInfo{name: name, ipAddresses: addresses, defaultIPAddr: def}
+       addressesByIP := make(map[string]bool)
+       for _, addr := range addressesByName {
+               addressesByIP[addr.ipV4] = true
+               addressesByIP[addr.ipV6] = true
+       }
+       return &hostInfo{name: name, ipAddressesByName: addressesByName, 
ipAddressesByIP: addressesByIP, defaultIPAddr: def}
 }
 
 func hostname0() (string, error) {

Reply via email to