This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 3b16ce3  add pod IP parallels searching
3b16ce3 is described below

commit 3b16ce30eb856e48ba918f1e5e890a83800df3ee
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 20:56:51 2024 +0800

    add pod IP parallels searching
---
 pkg/process/finders/kubernetes/finder.go | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/pkg/process/finders/kubernetes/finder.go 
b/pkg/process/finders/kubernetes/finder.go
index ec99938..a9043dd 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -21,11 +21,13 @@ import (
        "bufio"
        "context"
        "fmt"
+       "hash/fnv"
        "k8s.io/apimachinery/pkg/util/rand"
        "os"
        "regexp"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        lru "github.com/hashicorp/golang-lru"
@@ -58,6 +60,7 @@ var (
        kubepodsRegex      = 
regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`)
        openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`)
        ipExistTimeout     = time.Minute * 10
+       ipSearchParallel   = 10
 )
 
 type ProcessFinder struct {
@@ -81,6 +84,7 @@ type ProcessFinder struct {
 
        // for IsPodIP check
        podIPChecker *cache.Expiring
+       podIPMutexes map[int]sync.Mutex
 }
 
 func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, 
manager base.ProcessManager) error {
@@ -98,6 +102,10 @@ func (f *ProcessFinder) Init(ctx context.Context, conf 
base.FinderBaseConfig, ma
        f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
        f.manager = manager
        f.podIPChecker = cache.NewExpiring()
+       f.podIPMutexes = make(map[int]sync.Mutex)
+       for i := range ipSearchParallel {
+               f.podIPMutexes[i] = sync.Mutex{}
+       }
        processCache, err := lru.New(5000)
        if err != nil {
                return err
@@ -415,6 +423,15 @@ func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
        if exist {
                return val.(bool), nil
        }
+
+       // parallels the search
+       h := fnv.New32a()
+       h.Write([]byte(ip))
+       sum32 := int(h.Sum32())
+       mutex := f.podIPMutexes[sum32%ipSearchParallel]
+       mutex.Lock()
+       defer mutex.Unlock()
+
        pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx, 
metav1.ListOptions{
                FieldSelector: fields.OneTermEqualSelector("status.podIP", 
ip).String(),
                Limit:         1,
@@ -425,7 +442,7 @@ func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
        found := len(pods.Items) > 0
 
        // the timeout added a random value to avoid the cache avalanche
-       addedTime := time.Millisecond * time.Duration(rand.IntnRange(500, 5000))
+       addedTime := time.Millisecond * time.Duration(rand.IntnRange(1000, 
10000))
        f.podIPChecker.Set(ip, found, ipExistTimeout+addedTime)
        return found, nil
 }

Reply via email to