This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new ad26ea9 Redefine the Kubernetes process registry (#189)
ad26ea9 is described below
commit ad26ea9567553f9a4edde198bc8fc50703f5fec9
Author: mrproliu <[email protected]>
AuthorDate: Fri Apr 11 14:06:38 2025 +0800
Redefine the Kubernetes process registry (#189)
---
pkg/process/finders/kubernetes/container.go | 4 ++--
pkg/process/finders/kubernetes/finder.go | 29 +++++++++++++++++------------
pkg/process/finders/kubernetes/registry.go | 26 ++++++++++++++++----------
3 files changed, 35 insertions(+), 24 deletions(-)
diff --git a/pkg/process/finders/kubernetes/container.go
b/pkg/process/finders/kubernetes/container.go
index 3ed9c38..c5d0359 100644
--- a/pkg/process/finders/kubernetes/container.go
+++ b/pkg/process/finders/kubernetes/container.go
@@ -39,11 +39,11 @@ type PodContainer struct {
ContainerStatus v1.ContainerStatus
// the kubernetes resource registry
- registry *Registry
+ registry Registry
}
// AnalyzeContainers means query the containers by pod
-func AnalyzeContainers(pod *v1.Pod, registry *Registry) []*PodContainer {
+func AnalyzeContainers(pod *v1.Pod, registry Registry) []*PodContainer {
containers := make([]*PodContainer, 0)
// nolint
for _, cs := range pod.Status.ContainerStatuses {
diff --git a/pkg/process/finders/kubernetes/finder.go
b/pkg/process/finders/kubernetes/finder.go
index 58282ea..fd70936 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -76,18 +76,30 @@ type ProcessFinder struct {
// k8s clients
k8sConfig *rest.Config
- registry *Registry
+ registry Registry
CLI *kubernetes.Clientset
- // runtime config
- namespaces []string
-
// for IsPodIP check
podIPChecker *cache.Expiring
podIPMutexes map[int]*sync.Mutex
}
func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig,
manager base.ProcessManager) error {
+ return f.InitWithRegistry(ctx, conf, manager, func(clientset
*kubernetes.Clientset) Registry {
+ config := conf.(*Config)
+ var namespaces []string
+ // namespace update
+ if config.Namespaces != "" {
+ namespaces = strings.Split(config.Namespaces, ",")
+ } else {
+ namespaces = []string{v1.NamespaceAll}
+ }
+ return NewStaticNamespaceRegistry(clientset, namespaces,
config.NodeName)
+ })
+}
+
+func (f *ProcessFinder) InitWithRegistry(ctx context.Context, conf
base.FinderBaseConfig, manager base.ProcessManager,
+ registrySupplier func(*kubernetes.Clientset) Registry) error {
f.clusterName =
manager.GetModuleManager().FindModule(core.ModuleName).(core.Operator).ClusterName()
k8sConf, cli, err := f.validateConfig(ctx, conf.(*Config))
if err != nil {
@@ -99,7 +111,7 @@ func (f *ProcessFinder) Init(ctx context.Context, conf
base.FinderBaseConfig, ma
f.ctx, f.cancelCtx = context.WithCancel(ctx)
f.stopChan = make(chan struct{}, 1)
- f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
+ f.registry = registrySupplier(cli)
f.manager = manager
f.podIPChecker = cache.NewExpiring()
f.podIPMutexes = make(map[int]*sync.Mutex)
@@ -132,13 +144,6 @@ func (f *ProcessFinder) validateConfig(ctx
context.Context, conf *Config) (*rest
return nil, nil, fmt.Errorf("could not found the node: %s, %v",
conf.NodeName, err)
}
- // namespace update
- if conf.Namespaces != "" {
- f.namespaces = strings.Split(conf.Namespaces, ",")
- } else {
- f.namespaces = []string{v1.NamespaceAll}
- }
-
// process builders
if err := ProcessBuildersInit(conf.Analyzers); err != nil {
return nil, nil, err
diff --git a/pkg/process/finders/kubernetes/registry.go
b/pkg/process/finders/kubernetes/registry.go
index e2d464d..3e0123c 100644
--- a/pkg/process/finders/kubernetes/registry.go
+++ b/pkg/process/finders/kubernetes/registry.go
@@ -31,15 +31,21 @@ import (
const rsyncPeriod = 5 * time.Minute
-type Registry struct {
+type Registry interface {
+ Start(stopChan chan struct{})
+ BuildPodContainers() map[string]*PodContainer
+ FindServiceName(namespace, podName string) string
+}
+
+type StaticNamespaceRegistry struct {
podInformers []cache.SharedInformer
serviceInformers []cache.SharedInformer
podServiceNameCache map[string]string
}
-func NewRegistry(cli *kubernetes.Clientset, namespaces []string, nodeName
string) *Registry {
- r := &Registry{
+func NewStaticNamespaceRegistry(cli *kubernetes.Clientset, namespaces
[]string, nodeName string) Registry {
+ r := &StaticNamespaceRegistry{
podInformers: make([]cache.SharedInformer, 0),
serviceInformers: make([]cache.SharedInformer, 0),
podServiceNameCache: make(map[string]string),
@@ -59,14 +65,14 @@ func NewRegistry(cli *kubernetes.Clientset, namespaces
[]string, nodeName string
return r
}
-func (r *Registry) Start(stopChan chan struct{}) {
+func (r *StaticNamespaceRegistry) Start(stopChan chan struct{}) {
for i := range r.podInformers {
go r.podInformers[i].Run(stopChan)
go r.serviceInformers[i].Run(stopChan)
}
}
-func (r *Registry) BuildPodContainers() map[string]*PodContainer {
+func (r *StaticNamespaceRegistry) BuildPodContainers()
map[string]*PodContainer {
// cgroupid -> container
containers := make(map[string]*PodContainer)
for _, in := range r.podInformers {
@@ -84,11 +90,11 @@ func (r *Registry) BuildPodContainers()
map[string]*PodContainer {
return containers
}
-func (r *Registry) FindServiceName(namespace, podName string) string {
+func (r *StaticNamespaceRegistry) FindServiceName(namespace, podName string)
string {
return r.podServiceNameCache[namespace+"_"+podName]
}
-func (r *Registry) recomposePodServiceName() {
+func (r *StaticNamespaceRegistry) recomposePodServiceName() {
result := make(map[string]string)
for i := range r.podInformers {
for _, podT := range r.podInformers[i].GetStore().List() {
@@ -134,17 +140,17 @@ func chooseServiceName(a, b string) string {
return b
}
-func (r *Registry) OnAdd(d interface{}) {
+func (r *StaticNamespaceRegistry) OnAdd(d interface{}) {
r.recomposePodServiceName()
}
-func (r *Registry) OnUpdate(d, u interface{}) {
+func (r *StaticNamespaceRegistry) OnUpdate(d, u interface{}) {
same := reflect.DeepEqual(d, u)
if !same {
r.recomposePodServiceName()
}
}
-func (r *Registry) OnDelete(d interface{}) {
+func (r *StaticNamespaceRegistry) OnDelete(d interface{}) {
r.recomposePodServiceName()
}