This is an automated email from the ASF dual-hosted git repository.

kichan pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/trafficserver-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 3769bea  Watcher changes (#38)
3769bea is described below

commit 3769bea627103e49797f22cafa885d57d40d073e
Author: Rishabh Chhabra <[email protected]>
AuthorDate: Mon Aug 10 16:04:23 2020 -0500

    Watcher changes (#38)
    
    * Modifies clientset to use interface to enable testing
    
    * Uses shared informer factory
---
 watcher/watcher.go      | 58 ++++++++++++++++++++-----------------------------
 watcher/watcher_test.go | 21 ++++++++++--------
 2 files changed, 35 insertions(+), 44 deletions(-)

diff --git a/watcher/watcher.go b/watcher/watcher.go
index f09a287..f0c451f 100644
--- a/watcher/watcher.go
+++ b/watcher/watcher.go
@@ -77,8 +77,8 @@ func (w *Watcher) Watch() error {
        cmHandler := CMHandler{"configmaps", w.Ep}
        targetNs := make([]string, 1, 1)
        targetNs[0] = w.Ep.ATSManager.(*proxy.ATSManager).Namespace
-       err = w.inNamespacesWatchForConfigMaps(&cmHandler, 
w.Cs.CoreV1().RESTClient(),
-               targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+       err = w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+               targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
        if err != nil {
                return err
        }
@@ -88,7 +88,15 @@ func (w *Watcher) Watch() error {
 func (w *Watcher) allNamespacesWatchFor(h EventHandler, c cache.Getter,
        fieldSelector fields.Selector, objType pkgruntime.Object,
        resyncPeriod time.Duration, listerWatcher cache.ListerWatcher) error {
-       sharedInformer := cache.NewSharedInformer(listerWatcher, objType, 
resyncPeriod)
+
+       factory := informers.NewSharedInformerFactory(w.Cs, resyncPeriod)
+       var sharedInformer cache.SharedIndexInformer
+       switch objType.(type) {
+       case *v1.Endpoints:
+               sharedInformer = factory.Core().V1().Endpoints().Informer()
+       case *v1beta1.Ingress:
+               sharedInformer = 
factory.Extensions().V1beta1().Ingresses().Informer()
+       }
 
        sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    h.Add,
@@ -108,35 +116,6 @@ func (w *Watcher) allNamespacesWatchFor(h EventHandler, c 
cache.Getter,
 
 // This is meant to make it easier to add resource watchers on resources that
 // span multiple namespaces
-func (w *Watcher) inNamespacesWatchForConfigMaps(h EventHandler, c 
cache.Getter,
-       namespaces []string, fieldSelector fields.Selector, objType 
pkgruntime.Object,
-       resyncPeriod time.Duration, clientset kubernetes.Interface) error {
-       if len(namespaces) == 0 {
-               log.Panic("inNamespacesWatchFor must have at least 1 namespace")
-       }
-       syncFuncs := make([]cache.InformerSynced, len(namespaces))
-       for i, ns := range namespaces {
-               factory := 
informers.NewSharedInformerFactoryWithOptions(clientset, resyncPeriod, 
informers.WithNamespace(ns))
-               cmInfo := factory.Core().V1().ConfigMaps().Informer()
-
-               cmInfo.AddEventHandler(cache.ResourceEventHandlerFuncs{
-                       AddFunc:    h.Add,
-                       UpdateFunc: h.Update,
-                       DeleteFunc: h.Delete,
-               })
-
-               go cmInfo.Run(w.StopChan)
-
-               syncFuncs[i] = cmInfo.HasSynced
-       }
-       if !cache.WaitForCacheSync(w.StopChan, syncFuncs...) {
-               s := fmt.Sprintf("Timed out waiting for %s caches to sync", 
h.GetResourceName())
-               utilruntime.HandleError(fmt.Errorf(s))
-               return errors.New(s)
-       }
-       return nil
-}
-
 func (w *Watcher) inNamespacesWatchFor(h EventHandler, c cache.Getter,
        namespaces []string, fieldSelector fields.Selector, objType 
pkgruntime.Object,
        resyncPeriod time.Duration) error {
@@ -145,8 +124,17 @@ func (w *Watcher) inNamespacesWatchFor(h EventHandler, c 
cache.Getter,
        }
        syncFuncs := make([]cache.InformerSynced, len(namespaces))
        for i, ns := range namespaces {
-               epListWatch := cache.NewListWatchFromClient(c, 
h.GetResourceName(), ns, fieldSelector)
-               sharedInformer := cache.NewSharedInformer(epListWatch, objType, 
resyncPeriod)
+               factory := informers.NewSharedInformerFactoryWithOptions(w.Cs, 
resyncPeriod, informers.WithNamespace(ns))
+
+               var sharedInformer cache.SharedIndexInformer
+               switch objType.(type) {
+               case *v1.Endpoints:
+                       sharedInformer = 
factory.Core().V1().Endpoints().Informer()
+               case *v1beta1.Ingress:
+                       sharedInformer = 
factory.Extensions().V1beta1().Ingresses().Informer()
+               case *v1.ConfigMap:
+                       sharedInformer = 
factory.Core().V1().ConfigMaps().Informer()
+               }
 
                sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
                        AddFunc:    h.Add,
@@ -154,7 +142,7 @@ func (w *Watcher) inNamespacesWatchFor(h EventHandler, c 
cache.Getter,
                        DeleteFunc: h.Delete,
                })
 
-               go sharedInformer.Run(w.StopChan) // new thread
+               go sharedInformer.Run(w.StopChan)
 
                syncFuncs[i] = sharedInformer.HasSynced
        }
diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go
index e5ed740..e4ae13c 100644
--- a/watcher/watcher_test.go
+++ b/watcher/watcher_test.go
@@ -37,7 +37,7 @@ func TestAllNamespacesWatchFor_Add(t *testing.T) {
                t.Error(err)
        }
 
-       fc.Add(&v1.Endpoints{
+       w.Cs.CoreV1().Endpoints("trafficserver-test-2").Create(&v1.Endpoints{
                ObjectMeta: meta_v1.ObjectMeta{
                        Name:      "testsvc",
                        Namespace: "trafficserver-test-2",
@@ -62,6 +62,7 @@ func TestAllNamespacesWatchFor_Add(t *testing.T) {
                        },
                },
        })
+
        time.Sleep(100 * time.Millisecond)
 
        returnedKeys := w.Ep.RedisClient.GetDefaultDBKeyValues()
@@ -83,7 +84,7 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
                t.Error(err)
        }
 
-       fc.Add(&v1.Endpoints{
+       w.Cs.CoreV1().Endpoints("trafficserver-test-2").Create(&v1.Endpoints{
                ObjectMeta: meta_v1.ObjectMeta{
                        Name:      "testsvc",
                        Namespace: "trafficserver-test-2",
@@ -108,9 +109,10 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
                        },
                },
        })
+
        time.Sleep(100 * time.Millisecond)
 
-       fc.Modify(&v1.Endpoints{
+       w.Cs.CoreV1().Endpoints("trafficserver-test-2").Update(&v1.Endpoints{
                ObjectMeta: meta_v1.ObjectMeta{
                        Name:      "testsvc",
                        Namespace: "trafficserver-test-2",
@@ -135,6 +137,7 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
                        },
                },
        })
+
        time.Sleep(100 * time.Millisecond)
 
        returnedKeys := w.Ep.RedisClient.GetDefaultDBKeyValues()
@@ -226,8 +229,8 @@ func TestInNamespacesWatchFor_Add(t *testing.T) {
        targetNs := make([]string, 1, 1)
        targetNs[0] = "trafficserver"
 
-       err := w.inNamespacesWatchForConfigMaps(&cmHandler, 
w.Cs.CoreV1().RESTClient(),
-               targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+       err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+               targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
 
        if err != nil {
                t.Error(err)
@@ -278,8 +281,8 @@ func TestInNamespacesWatchFor_Update(t *testing.T) {
        targetNs := make([]string, 1, 1)
        targetNs[0] = "trafficserver"
 
-       err := w.inNamespacesWatchForConfigMaps(&cmHandler, 
w.Cs.CoreV1().RESTClient(),
-               targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+       err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+               targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
 
        if err != nil {
                t.Error(err)
@@ -343,8 +346,8 @@ func TestInNamespacesWatchFor_ShouldNotAdd(t *testing.T) {
        targetNs := make([]string, 1, 1)
        targetNs[0] = "trafficserver"
 
-       err := w.inNamespacesWatchForConfigMaps(&cmHandler, 
w.Cs.CoreV1().RESTClient(),
-               targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+       err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+               targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
 
        if err != nil {
                t.Error(err)

Reply via email to