This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9dbbb4 [operator] add kube types and factory logic
ed9dbbb4 is described below

commit ed9dbbb40fe06c443953e78517bd2ddbb320bbd2
Author: mfordjody <[email protected]>
AuthorDate: Thu Dec 19 10:59:41 2024 +0800

    [operator] add kube types and factory logic
---
 pkg/kube/informerfactory/factory.go | 107 ++++++++++++++++++++++++++++++++++++
 pkg/kube/kubetypes/types.go         |  17 ++++++
 2 files changed, 124 insertions(+)

diff --git a/pkg/kube/informerfactory/factory.go 
b/pkg/kube/informerfactory/factory.go
index 5252f448..c158dbc6 100644
--- a/pkg/kube/informerfactory/factory.go
+++ b/pkg/kube/informerfactory/factory.go
@@ -1,8 +1,11 @@
 package informerfactory
 
 import (
+       "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "k8s.io/apimachinery/pkg/runtime/schema"
+
        "k8s.io/client-go/tools/cache"
+       "sync"
 )
 
 type NewInformerFunc func() cache.SharedIndexInformer
@@ -19,6 +22,110 @@ type InformerFactory interface {
        Shutdown()
 }
 
+type informerKey struct {
+       gvr           schema.GroupVersionResource
+       labelSelector string
+       fieldSelector string
+       informerType  kubetypes.InformerType
+       namespace     string
+}
+
+type informerFactory struct {
+       lock             sync.Mutex
+       informers        map[informerKey]builtInformer
+       startedInformers sets.Set[informerKey]
+       wg               sync.WaitGroup
+       shuttingDown     bool
+}
+
+type builtInformer struct {
+       informer        cache.SharedIndexInformer
+       objectTransform func(obj any) (any, error)
+}
+
+func NewSharedInformerFactory() InformerFactory {
+       return &informerFactory{
+               informers:        map[informerKey]builtInformer{},
+               startedInformers: sets.New[informerKey](),
+       }
+}
+
 func (s StartableInformer) Start(stopCh <-chan struct{}) {
        s.start(stopCh)
 }
+
+func (f *informerFactory) Start(stopCh <-chan struct{}) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       if f.shuttingDown {
+               return
+       }
+
+       for informerType, informer := range f.informers {
+               if !f.startedInformers.Contains(informerType) {
+                       informer := informer
+                       f.wg.Add(1)
+                       go func() {
+                               defer f.wg.Done()
+                               informer.informer.Run(stopCh)
+                       }()
+                       f.startedInformers.Insert(informerType)
+               }
+       }
+}
+
+func (f *informerFactory) WaitForCacheSync(stopCh <-chan struct{}) bool {
+       informers := func() []cache.SharedIndexInformer {
+               f.lock.Lock()
+               defer f.lock.Unlock()
+               informers := make([]cache.SharedIndexInformer, 0, 
len(f.informers))
+               for informerKey, informer := range f.informers {
+                       if f.startedInformers.Contains(informerKey) {
+                               informers = append(informers, informer.informer)
+                       }
+               }
+               return informers
+       }()
+
+       for _, informer := range informers {
+               if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
+                       return false
+               }
+       }
+       return true
+}
+
+func (f *informerFactory) Shutdown() {
+       defer f.wg.Wait()
+
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       f.shuttingDown = true
+}
+
+func (f *informerFactory) InformerFor(resource schema.GroupVersionResource, 
opts kubetypes.InformerOptions, newFunc NewInformerFunc) StartableInformer {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       key := informerKey{
+               gvr:           resource,
+               labelSelector: opts.LabelSelector,
+               fieldSelector: opts.FieldSelector,
+               informerType:  opts.InformerType,
+               namespace:     opts.Namespace,
+       }
+       inf, exists := f.informers[key]
+       if exists {
+               checkInformerOverlap(inf, resource, opts)
+               return f.makeStartableInformer(inf.informer, key)
+       }
+
+       informer := newFunc()
+       f.informers[key] = builtInformer{
+               informer:        informer,
+               objectTransform: opts.ObjectTransform,
+       }
+
+       return f.makeStartableInformer(informer, key)
+}
diff --git a/pkg/kube/kubetypes/types.go b/pkg/kube/kubetypes/types.go
new file mode 100644
index 00000000..eb93ecab
--- /dev/null
+++ b/pkg/kube/kubetypes/types.go
@@ -0,0 +1,17 @@
+package kubetypes
+
+type InformerOptions struct {
+       LabelSelector   string
+       FieldSelector   string
+       Namespace       string
+       ObjectTransform func(obj any) (any, error)
+       InformerType    InformerType
+}
+
+type InformerType int
+
+const (
+       StandardInformer InformerType = iota
+       DynamicInformer
+       MetadataInformer
+)

Reply via email to