This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 3b16ce3 add pod IP parallels searching
3b16ce3 is described below
commit 3b16ce30eb856e48ba918f1e5e890a83800df3ee
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 20:56:51 2024 +0800
add pod IP parallels searching
---
pkg/process/finders/kubernetes/finder.go | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git a/pkg/process/finders/kubernetes/finder.go
b/pkg/process/finders/kubernetes/finder.go
index ec99938..a9043dd 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -21,11 +21,13 @@ import (
"bufio"
"context"
"fmt"
+ "hash/fnv"
"k8s.io/apimachinery/pkg/util/rand"
"os"
"regexp"
"strconv"
"strings"
+ "sync"
"time"
lru "github.com/hashicorp/golang-lru"
@@ -58,6 +60,7 @@ var (
kubepodsRegex =
regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`)
openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`)
ipExistTimeout = time.Minute * 10
+ ipSearchParallel = 10
)
type ProcessFinder struct {
@@ -81,6 +84,7 @@ type ProcessFinder struct {
// for IsPodIP check
podIPChecker *cache.Expiring
+ podIPMutexes map[int]sync.Mutex
}
func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig,
manager base.ProcessManager) error {
@@ -98,6 +102,10 @@ func (f *ProcessFinder) Init(ctx context.Context, conf
base.FinderBaseConfig, ma
f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
f.manager = manager
f.podIPChecker = cache.NewExpiring()
+ f.podIPMutexes = make(map[int]sync.Mutex)
+ for i := range ipSearchParallel {
+ f.podIPMutexes[i] = sync.Mutex{}
+ }
processCache, err := lru.New(5000)
if err != nil {
return err
@@ -415,6 +423,15 @@ func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
if exist {
return val.(bool), nil
}
+
+ // parallels the search
+ h := fnv.New32a()
+ h.Write([]byte(ip))
+ sum32 := int(h.Sum32())
+ mutex := f.podIPMutexes[sum32%ipSearchParallel]
+ mutex.Lock()
+ defer mutex.Unlock()
+
pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx,
metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("status.podIP",
ip).String(),
Limit: 1,
@@ -425,7 +442,7 @@ func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
found := len(pods.Items) > 0
// the timeout added a random value to avoid the cache avalanche
- addedTime := time.Millisecond * time.Duration(rand.IntnRange(500, 5000))
+ addedTime := time.Millisecond * time.Duration(rand.IntnRange(1000,
10000))
f.podIPChecker.Set(ip, found, ipExistTimeout+addedTime)
return found, nil
}