This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 23e39b44 [operator] Client application request completed
23e39b44 is described below
commit 23e39b44e110370f8942af626d1b397509085c8f
Author: mfordjody <[email protected]>
AuthorDate: Thu Dec 19 11:18:16 2024 +0800
[operator] Client application request completed
---
pkg/kube/client.go | 52 ++++++++++++++++++++++++++++++++++---
pkg/kube/informerfactory/factory.go | 39 ++++++++++++++++++++++++++++
2 files changed, 87 insertions(+), 4 deletions(-)
diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index 2c354fee..cf77ef51 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -4,6 +4,8 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/operator/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/kube/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
+ "github.com/apache/dubbo-kubernetes/pkg/laziness"
apiextensionsv1
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -11,21 +13,31 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ kubeVersion "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
+ "k8s.io/client-go/kubernetes"
kubescheme "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
+ "net/http"
+ "time"
)
type client struct {
- dynamic dynamic.Interface
config *rest.Config
revision string
factory *clientFactory
+ version laziness.Laziness[*kubeVersion.Info]
+ informerFactory informerfactory.InformerFactory
restClient *rest.RESTClient
discoveryClient discovery.CachedDiscoveryInterface
+ dynamic dynamic.Interface
+ kube kubernetes.Interface
+ metadata metadata.Interface
mapper meta.ResettableRESTMapper
+ http *http.Client
}
type Client interface {
@@ -66,10 +78,42 @@ func newInternalClient(factory *clientFactory, opts
...ClientOption) (CLIClient,
if err != nil {
return nil, err
}
+ c.kube, err = kubernetes.NewForConfig(c.config)
+ if err != nil {
+ return nil, err
+ }
+ c.metadata, err = metadata.NewForConfig(c.config)
+ if err != nil {
+ return nil, err
+ }
+ c.dynamic, err = dynamic.NewForConfig(c.config)
+ if err != nil {
+ return nil, err
+ }
+ c.informerFactory = informerfactory.NewSharedInformerFactory()
+ c.http = &http.Client{
+ Timeout: time.Second * 15,
+ }
+ var clientWithTimeout kubernetes.Interface
+ clientWithTimeout = c.kube
+ restConfig := c.RESTConfig()
+ if restConfig != nil {
+ restConfig.Timeout = time.Second * 5
+ kubeClient, err := kubernetes.NewForConfig(restConfig)
+ if err == nil {
+ clientWithTimeout = kubeClient
+ }
+ }
+ c.version =
laziness.NewWithRetry(clientWithTimeout.Discovery().ServerVersion)
+ return &c, nil
+}
- c.factory = informerfactory
-
- return nil, err
+func (c *client) RESTConfig() *rest.Config {
+ if c.config == nil {
+ return nil
+ }
+ cpy := *c.config
+ return &cpy
}
var (
diff --git a/pkg/kube/informerfactory/factory.go
b/pkg/kube/informerfactory/factory.go
index c158dbc6..a1b014c7 100644
--- a/pkg/kube/informerfactory/factory.go
+++ b/pkg/kube/informerfactory/factory.go
@@ -1,6 +1,8 @@
package informerfactory
import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -129,3 +131,40 @@ func (f *informerFactory) InformerFor(resource
schema.GroupVersionResource, opts
return f.makeStartableInformer(informer, key)
}
+
+func checkInformerOverlap(inf builtInformer, resource
schema.GroupVersionResource, opts kubetypes.InformerOptions) {
+ if fmt.Sprintf("%p", inf.objectTransform) == fmt.Sprintf("%p",
opts.ObjectTransform) {
+ return
+ }
+}
+
+func (f *informerFactory) makeStartableInformer(informer
cache.SharedIndexInformer, key informerKey) StartableInformer {
+ return StartableInformer{
+ Informer: informer,
+ start: func(stopCh <-chan struct{}) {
+ f.startOne(stopCh, key)
+ },
+ }
+}
+
+func (f *informerFactory) startOne(stopCh <-chan struct{}, informerType
informerKey) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.shuttingDown {
+ return
+ }
+
+ informer, ff := f.informers[informerType]
+ if !ff {
+ panic(fmt.Sprintf("bug: informer key %+v not found",
informerType))
+ }
+ if !f.startedInformers.Contains(informerType) {
+ f.wg.Add(1)
+ go func() {
+ defer f.wg.Done()
+ informer.informer.Run(stopCh)
+ }()
+ f.startedInformers.Insert(informerType)
+ }
+}