This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new ad26ea9  Redefine the Kubernetes process registry (#189)
ad26ea9 is described below

commit ad26ea9567553f9a4edde198bc8fc50703f5fec9
Author: mrproliu <[email protected]>
AuthorDate: Fri Apr 11 14:06:38 2025 +0800

    Redefine the Kubernetes process registry (#189)
---
 pkg/process/finders/kubernetes/container.go |  4 ++--
 pkg/process/finders/kubernetes/finder.go    | 29 +++++++++++++++++------------
 pkg/process/finders/kubernetes/registry.go  | 26 ++++++++++++++++----------
 3 files changed, 35 insertions(+), 24 deletions(-)

diff --git a/pkg/process/finders/kubernetes/container.go 
b/pkg/process/finders/kubernetes/container.go
index 3ed9c38..c5d0359 100644
--- a/pkg/process/finders/kubernetes/container.go
+++ b/pkg/process/finders/kubernetes/container.go
@@ -39,11 +39,11 @@ type PodContainer struct {
        ContainerStatus v1.ContainerStatus
 
        // the kubernetes resource registry
-       registry *Registry
+       registry Registry
 }
 
 // AnalyzeContainers means query the containers by pod
-func AnalyzeContainers(pod *v1.Pod, registry *Registry) []*PodContainer {
+func AnalyzeContainers(pod *v1.Pod, registry Registry) []*PodContainer {
        containers := make([]*PodContainer, 0)
        // nolint
        for _, cs := range pod.Status.ContainerStatuses {
diff --git a/pkg/process/finders/kubernetes/finder.go 
b/pkg/process/finders/kubernetes/finder.go
index 58282ea..fd70936 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -76,18 +76,30 @@ type ProcessFinder struct {
 
        // k8s clients
        k8sConfig *rest.Config
-       registry  *Registry
+       registry  Registry
        CLI       *kubernetes.Clientset
 
-       // runtime config
-       namespaces []string
-
        // for IsPodIP check
        podIPChecker *cache.Expiring
        podIPMutexes map[int]*sync.Mutex
 }
 
 func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, 
manager base.ProcessManager) error {
+       return f.InitWithRegistry(ctx, conf, manager, func(clientset 
*kubernetes.Clientset) Registry {
+               config := conf.(*Config)
+               var namespaces []string
+               // namespace update
+               if config.Namespaces != "" {
+                       namespaces = strings.Split(config.Namespaces, ",")
+               } else {
+                       namespaces = []string{v1.NamespaceAll}
+               }
+               return NewStaticNamespaceRegistry(clientset, namespaces, 
config.NodeName)
+       })
+}
+
+func (f *ProcessFinder) InitWithRegistry(ctx context.Context, conf 
base.FinderBaseConfig, manager base.ProcessManager,
+       registrySupplier func(*kubernetes.Clientset) Registry) error {
        f.clusterName = 
manager.GetModuleManager().FindModule(core.ModuleName).(core.Operator).ClusterName()
        k8sConf, cli, err := f.validateConfig(ctx, conf.(*Config))
        if err != nil {
@@ -99,7 +111,7 @@ func (f *ProcessFinder) Init(ctx context.Context, conf 
base.FinderBaseConfig, ma
 
        f.ctx, f.cancelCtx = context.WithCancel(ctx)
        f.stopChan = make(chan struct{}, 1)
-       f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
+       f.registry = registrySupplier(cli)
        f.manager = manager
        f.podIPChecker = cache.NewExpiring()
        f.podIPMutexes = make(map[int]*sync.Mutex)
@@ -132,13 +144,6 @@ func (f *ProcessFinder) validateConfig(ctx 
context.Context, conf *Config) (*rest
                return nil, nil, fmt.Errorf("could not found the node: %s, %v", 
conf.NodeName, err)
        }
 
-       // namespace update
-       if conf.Namespaces != "" {
-               f.namespaces = strings.Split(conf.Namespaces, ",")
-       } else {
-               f.namespaces = []string{v1.NamespaceAll}
-       }
-
        // process builders
        if err := ProcessBuildersInit(conf.Analyzers); err != nil {
                return nil, nil, err
diff --git a/pkg/process/finders/kubernetes/registry.go 
b/pkg/process/finders/kubernetes/registry.go
index e2d464d..3e0123c 100644
--- a/pkg/process/finders/kubernetes/registry.go
+++ b/pkg/process/finders/kubernetes/registry.go
@@ -31,15 +31,21 @@ import (
 
 const rsyncPeriod = 5 * time.Minute
 
-type Registry struct {
+type Registry interface {
+       Start(stopChan chan struct{})
+       BuildPodContainers() map[string]*PodContainer
+       FindServiceName(namespace, podName string) string
+}
+
+type StaticNamespaceRegistry struct {
        podInformers     []cache.SharedInformer
        serviceInformers []cache.SharedInformer
 
        podServiceNameCache map[string]string
 }
 
-func NewRegistry(cli *kubernetes.Clientset, namespaces []string, nodeName 
string) *Registry {
-       r := &Registry{
+func NewStaticNamespaceRegistry(cli *kubernetes.Clientset, namespaces 
[]string, nodeName string) Registry {
+       r := &StaticNamespaceRegistry{
                podInformers:        make([]cache.SharedInformer, 0),
                serviceInformers:    make([]cache.SharedInformer, 0),
                podServiceNameCache: make(map[string]string),
@@ -59,14 +65,14 @@ func NewRegistry(cli *kubernetes.Clientset, namespaces 
[]string, nodeName string
        return r
 }
 
-func (r *Registry) Start(stopChan chan struct{}) {
+func (r *StaticNamespaceRegistry) Start(stopChan chan struct{}) {
        for i := range r.podInformers {
                go r.podInformers[i].Run(stopChan)
                go r.serviceInformers[i].Run(stopChan)
        }
 }
 
-func (r *Registry) BuildPodContainers() map[string]*PodContainer {
+func (r *StaticNamespaceRegistry) BuildPodContainers() 
map[string]*PodContainer {
        // cgroupid -> container
        containers := make(map[string]*PodContainer)
        for _, in := range r.podInformers {
@@ -84,11 +90,11 @@ func (r *Registry) BuildPodContainers() 
map[string]*PodContainer {
        return containers
 }
 
-func (r *Registry) FindServiceName(namespace, podName string) string {
+func (r *StaticNamespaceRegistry) FindServiceName(namespace, podName string) 
string {
        return r.podServiceNameCache[namespace+"_"+podName]
 }
 
-func (r *Registry) recomposePodServiceName() {
+func (r *StaticNamespaceRegistry) recomposePodServiceName() {
        result := make(map[string]string)
        for i := range r.podInformers {
                for _, podT := range r.podInformers[i].GetStore().List() {
@@ -134,17 +140,17 @@ func chooseServiceName(a, b string) string {
        return b
 }
 
-func (r *Registry) OnAdd(d interface{}) {
+func (r *StaticNamespaceRegistry) OnAdd(d interface{}) {
        r.recomposePodServiceName()
 }
 
-func (r *Registry) OnUpdate(d, u interface{}) {
+func (r *StaticNamespaceRegistry) OnUpdate(d, u interface{}) {
        same := reflect.DeepEqual(d, u)
        if !same {
                r.recomposePodServiceName()
        }
 }
 
-func (r *Registry) OnDelete(d interface{}) {
+func (r *StaticNamespaceRegistry) OnDelete(d interface{}) {
        r.recomposePodServiceName()
 }

Reply via email to