This is an automated email from the ASF dual-hosted git repository.
kichan pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/trafficserver-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 3769bea Watcher changes (#38)
3769bea is described below
commit 3769bea627103e49797f22cafa885d57d40d073e
Author: Rishabh Chhabra <[email protected]>
AuthorDate: Mon Aug 10 16:04:23 2020 -0500
Watcher changes (#38)
* Modifies clientset to use interface to enable testing
* Uses shared informer factory
---
watcher/watcher.go | 58 ++++++++++++++++++++-----------------------------
watcher/watcher_test.go | 21 ++++++++++--------
2 files changed, 35 insertions(+), 44 deletions(-)
diff --git a/watcher/watcher.go b/watcher/watcher.go
index f09a287..f0c451f 100644
--- a/watcher/watcher.go
+++ b/watcher/watcher.go
@@ -77,8 +77,8 @@ func (w *Watcher) Watch() error {
cmHandler := CMHandler{"configmaps", w.Ep}
targetNs := make([]string, 1, 1)
targetNs[0] = w.Ep.ATSManager.(*proxy.ATSManager).Namespace
- err = w.inNamespacesWatchForConfigMaps(&cmHandler,
w.Cs.CoreV1().RESTClient(),
- targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+ err = w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+ targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
if err != nil {
return err
}
@@ -88,7 +88,15 @@ func (w *Watcher) Watch() error {
func (w *Watcher) allNamespacesWatchFor(h EventHandler, c cache.Getter,
fieldSelector fields.Selector, objType pkgruntime.Object,
resyncPeriod time.Duration, listerWatcher cache.ListerWatcher) error {
- sharedInformer := cache.NewSharedInformer(listerWatcher, objType,
resyncPeriod)
+
+ factory := informers.NewSharedInformerFactory(w.Cs, resyncPeriod)
+ var sharedInformer cache.SharedIndexInformer
+ switch objType.(type) {
+ case *v1.Endpoints:
+ sharedInformer = factory.Core().V1().Endpoints().Informer()
+ case *v1beta1.Ingress:
+ sharedInformer =
factory.Extensions().V1beta1().Ingresses().Informer()
+ }
sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: h.Add,
@@ -108,35 +116,6 @@ func (w *Watcher) allNamespacesWatchFor(h EventHandler, c
cache.Getter,
// This is meant to make it easier to add resource watchers on resources that
// span multiple namespaces
-func (w *Watcher) inNamespacesWatchForConfigMaps(h EventHandler, c
cache.Getter,
- namespaces []string, fieldSelector fields.Selector, objType
pkgruntime.Object,
- resyncPeriod time.Duration, clientset kubernetes.Interface) error {
- if len(namespaces) == 0 {
- log.Panic("inNamespacesWatchFor must have at least 1 namespace")
- }
- syncFuncs := make([]cache.InformerSynced, len(namespaces))
- for i, ns := range namespaces {
- factory :=
informers.NewSharedInformerFactoryWithOptions(clientset, resyncPeriod,
informers.WithNamespace(ns))
- cmInfo := factory.Core().V1().ConfigMaps().Informer()
-
- cmInfo.AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: h.Add,
- UpdateFunc: h.Update,
- DeleteFunc: h.Delete,
- })
-
- go cmInfo.Run(w.StopChan)
-
- syncFuncs[i] = cmInfo.HasSynced
- }
- if !cache.WaitForCacheSync(w.StopChan, syncFuncs...) {
- s := fmt.Sprintf("Timed out waiting for %s caches to sync",
h.GetResourceName())
- utilruntime.HandleError(fmt.Errorf(s))
- return errors.New(s)
- }
- return nil
-}
-
func (w *Watcher) inNamespacesWatchFor(h EventHandler, c cache.Getter,
namespaces []string, fieldSelector fields.Selector, objType
pkgruntime.Object,
resyncPeriod time.Duration) error {
@@ -145,8 +124,17 @@ func (w *Watcher) inNamespacesWatchFor(h EventHandler, c
cache.Getter,
}
syncFuncs := make([]cache.InformerSynced, len(namespaces))
for i, ns := range namespaces {
- epListWatch := cache.NewListWatchFromClient(c,
h.GetResourceName(), ns, fieldSelector)
- sharedInformer := cache.NewSharedInformer(epListWatch, objType,
resyncPeriod)
+ factory := informers.NewSharedInformerFactoryWithOptions(w.Cs,
resyncPeriod, informers.WithNamespace(ns))
+
+ var sharedInformer cache.SharedIndexInformer
+ switch objType.(type) {
+ case *v1.Endpoints:
+ sharedInformer =
factory.Core().V1().Endpoints().Informer()
+ case *v1beta1.Ingress:
+ sharedInformer =
factory.Extensions().V1beta1().Ingresses().Informer()
+ case *v1.ConfigMap:
+ sharedInformer =
factory.Core().V1().ConfigMaps().Informer()
+ }
sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: h.Add,
@@ -154,7 +142,7 @@ func (w *Watcher) inNamespacesWatchFor(h EventHandler, c
cache.Getter,
DeleteFunc: h.Delete,
})
- go sharedInformer.Run(w.StopChan) // new thread
+ go sharedInformer.Run(w.StopChan)
syncFuncs[i] = sharedInformer.HasSynced
}
diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go
index e5ed740..e4ae13c 100644
--- a/watcher/watcher_test.go
+++ b/watcher/watcher_test.go
@@ -37,7 +37,7 @@ func TestAllNamespacesWatchFor_Add(t *testing.T) {
t.Error(err)
}
- fc.Add(&v1.Endpoints{
+ w.Cs.CoreV1().Endpoints("trafficserver-test-2").Create(&v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: "testsvc",
Namespace: "trafficserver-test-2",
@@ -62,6 +62,7 @@ func TestAllNamespacesWatchFor_Add(t *testing.T) {
},
},
})
+
time.Sleep(100 * time.Millisecond)
returnedKeys := w.Ep.RedisClient.GetDefaultDBKeyValues()
@@ -83,7 +84,7 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
t.Error(err)
}
- fc.Add(&v1.Endpoints{
+ w.Cs.CoreV1().Endpoints("trafficserver-test-2").Create(&v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: "testsvc",
Namespace: "trafficserver-test-2",
@@ -108,9 +109,10 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
},
},
})
+
time.Sleep(100 * time.Millisecond)
- fc.Modify(&v1.Endpoints{
+ w.Cs.CoreV1().Endpoints("trafficserver-test-2").Update(&v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: "testsvc",
Namespace: "trafficserver-test-2",
@@ -135,6 +137,7 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
},
},
})
+
time.Sleep(100 * time.Millisecond)
returnedKeys := w.Ep.RedisClient.GetDefaultDBKeyValues()
@@ -226,8 +229,8 @@ func TestInNamespacesWatchFor_Add(t *testing.T) {
targetNs := make([]string, 1, 1)
targetNs[0] = "trafficserver"
- err := w.inNamespacesWatchForConfigMaps(&cmHandler,
w.Cs.CoreV1().RESTClient(),
- targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+ err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+ targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
if err != nil {
t.Error(err)
@@ -278,8 +281,8 @@ func TestInNamespacesWatchFor_Update(t *testing.T) {
targetNs := make([]string, 1, 1)
targetNs[0] = "trafficserver"
- err := w.inNamespacesWatchForConfigMaps(&cmHandler,
w.Cs.CoreV1().RESTClient(),
- targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+ err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+ targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
if err != nil {
t.Error(err)
@@ -343,8 +346,8 @@ func TestInNamespacesWatchFor_ShouldNotAdd(t *testing.T) {
targetNs := make([]string, 1, 1)
targetNs[0] = "trafficserver"
- err := w.inNamespacesWatchForConfigMaps(&cmHandler,
w.Cs.CoreV1().RESTClient(),
- targetNs, fields.Everything(), &v1.ConfigMap{}, 0, w.Cs)
+ err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
+ targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
if err != nil {
t.Error(err)