This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new 23e39b44 [operator] Client application request completed
23e39b44 is described below

commit 23e39b44e110370f8942af626d1b397509085c8f
Author: mfordjody <[email protected]>
AuthorDate: Thu Dec 19 11:18:16 2024 +0800

    [operator] Client application request completed
---
 pkg/kube/client.go                  | 52 ++++++++++++++++++++++++++++++++++---
 pkg/kube/informerfactory/factory.go | 39 ++++++++++++++++++++++++++++
 2 files changed, 87 insertions(+), 4 deletions(-)

diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index 2c354fee..cf77ef51 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -4,6 +4,8 @@ import (
        "fmt"
        "github.com/apache/dubbo-kubernetes/operator/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/kube/collections"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
+       "github.com/apache/dubbo-kubernetes/pkg/laziness"
        apiextensionsv1 
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
        "k8s.io/apimachinery/pkg/api/meta"
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -11,21 +13,31 @@ import (
        "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/apimachinery/pkg/runtime/serializer"
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+       kubeVersion "k8s.io/apimachinery/pkg/version"
        "k8s.io/client-go/discovery"
        "k8s.io/client-go/dynamic"
+       "k8s.io/client-go/kubernetes"
        kubescheme "k8s.io/client-go/kubernetes/scheme"
+       "k8s.io/client-go/metadata"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/tools/clientcmd"
+       "net/http"
+       "time"
 )
 
 type client struct {
-       dynamic         dynamic.Interface
        config          *rest.Config
        revision        string
        factory         *clientFactory
+       version         laziness.Laziness[*kubeVersion.Info]
+       informerFactory informerfactory.InformerFactory
        restClient      *rest.RESTClient
        discoveryClient discovery.CachedDiscoveryInterface
+       dynamic         dynamic.Interface
+       kube            kubernetes.Interface
+       metadata        metadata.Interface
        mapper          meta.ResettableRESTMapper
+       http            *http.Client
 }
 
 type Client interface {
@@ -66,10 +78,42 @@ func newInternalClient(factory *clientFactory, opts 
...ClientOption) (CLIClient,
        if err != nil {
                return nil, err
        }
+       c.kube, err = kubernetes.NewForConfig(c.config)
+       if err != nil {
+               return nil, err
+       }
+       c.metadata, err = metadata.NewForConfig(c.config)
+       if err != nil {
+               return nil, err
+       }
+       c.dynamic, err = dynamic.NewForConfig(c.config)
+       if err != nil {
+               return nil, err
+       }
+       c.informerFactory = informerfactory.NewSharedInformerFactory()
+       c.http = &http.Client{
+               Timeout: time.Second * 15,
+       }
+       var clientWithTimeout kubernetes.Interface
+       clientWithTimeout = c.kube
+       restConfig := c.RESTConfig()
+       if restConfig != nil {
+               restConfig.Timeout = time.Second * 5
+               kubeClient, err := kubernetes.NewForConfig(restConfig)
+               if err == nil {
+                       clientWithTimeout = kubeClient
+               }
+       }
+       c.version = 
laziness.NewWithRetry(clientWithTimeout.Discovery().ServerVersion)
+       return &c, nil
+}
 
-       c.factory = informerfactory
-
-       return nil, err
+func (c *client) RESTConfig() *rest.Config {
+       if c.config == nil {
+               return nil
+       }
+       cpy := *c.config
+       return &cpy
 }
 
 var (
diff --git a/pkg/kube/informerfactory/factory.go 
b/pkg/kube/informerfactory/factory.go
index c158dbc6..a1b014c7 100644
--- a/pkg/kube/informerfactory/factory.go
+++ b/pkg/kube/informerfactory/factory.go
@@ -1,6 +1,8 @@
 package informerfactory
 
 import (
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
        "github.com/apache/dubbo-kubernetes/pkg/util/sets"
        "k8s.io/apimachinery/pkg/runtime/schema"
 
@@ -129,3 +131,40 @@ func (f *informerFactory) InformerFor(resource 
schema.GroupVersionResource, opts
 
        return f.makeStartableInformer(informer, key)
 }
+
+func checkInformerOverlap(inf builtInformer, resource 
schema.GroupVersionResource, opts kubetypes.InformerOptions) {
+       if fmt.Sprintf("%p", inf.objectTransform) == fmt.Sprintf("%p", 
opts.ObjectTransform) {
+               return
+       }
+}
+
+func (f *informerFactory) makeStartableInformer(informer 
cache.SharedIndexInformer, key informerKey) StartableInformer {
+       return StartableInformer{
+               Informer: informer,
+               start: func(stopCh <-chan struct{}) {
+                       f.startOne(stopCh, key)
+               },
+       }
+}
+
+func (f *informerFactory) startOne(stopCh <-chan struct{}, informerType 
informerKey) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       if f.shuttingDown {
+               return
+       }
+
+       informer, ff := f.informers[informerType]
+       if !ff {
+               panic(fmt.Sprintf("bug: informer key %+v not found", 
informerType))
+       }
+       if !f.startedInformers.Contains(informerType) {
+               f.wg.Add(1)
+               go func() {
+                       defer f.wg.Done()
+                       informer.informer.Run(stopCh)
+               }()
+               f.startedInformers.Insert(informerType)
+       }
+}

Reply via email to