tianxiaoliang commented on code in PR #1309:
URL: 
https://github.com/apache/servicecomb-service-center/pull/1309#discussion_r914783074


##########
istio/pkg/controllers/istioconnector/controller.go:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package istioconnector
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/istio/pkg/event"
+       "github.com/apache/servicecomb-service-center/istio/pkg/utils"
+       "github.com/go-chassis/cari/discovery"
+       "istio.io/client-go/pkg/apis/networking/v1alpha3"
+       "istio.io/client-go/pkg/clientset/versioned"
+       "istio.io/pkg/log"
+       v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/rest"
+       "k8s.io/client-go/tools/clientcmd"
+       k8s "sigs.k8s.io/controller-runtime/pkg/client/config"
+)
+
+// Controller receives service center updates and pushes converted Istio 
ServiceEntry(s) to k8s api server
+type Controller struct {
+       // Istio istioClient for k8s API
+       istioClient *versioned.Clientset
+       // Channel used to send and receive service center change events from 
the service center controller
+       event chan []event.ChangeEvent
+       // Lock for service cache
+       cacheMutex sync.Mutex
+       // Cache of converted service entries, mapped to original service 
center service id
+       convertedServiceCache map[string]*v1alpha3.ServiceEntry
+}
+
+func NewController(kubeconfigPath string, e chan []event.ChangeEvent) 
(*Controller, error) {
+       controller := &Controller{
+               event:                 e,
+               convertedServiceCache: make(map[string]*v1alpha3.ServiceEntry),
+       }
+
+       // get kubernetes config info, used for creating k8s client
+       client, err := newKubeClient(kubeconfigPath)
+       if err != nil {
+               log.Errorf("Failed to create istio client: %v\n", err)
+               return nil, err
+       }
+       controller.istioClient = client
+       return controller, nil
+}
+
+// Return a debounced version of a function `fn` that will not run until 
`wait` seconds have passed
+// after it was last called or until `maxWait` seconds have passed since its 
first call.
+// Once `fn` is executed, the max wait timer is reset.
+func debounce(fn func(), wait time.Duration, maxWait time.Duration) func() {
+       // Main timer, time seconds elapsed since last execution
+       var timer *time.Timer
+       // Max wait timer, time seconds elapsed since first call
+       var maxTimer *time.Timer
+       return func() {
+               if maxTimer == nil {
+                       // First debounced event, start max wait timer
+                       // will only run target func if not called again after 
`maxWait` duration
+                       maxTimer = time.AfterFunc(maxWait, func() {
+                               // Reset all timers when max wait time is 
reached
+                               log.Debugf("Debounce: maximum wait time 
reached, running target fn\n")
+                               if timer.Stop() {
+                                       // Only run target func if main timer 
hasn't already
+                                       fn()
+                               }
+                               timer = nil
+                               maxTimer = nil
+                       })
+                       log.Debugf("Debounce: max timer started, will wait max 
time of %s\n", maxWait)
+               }
+               if timer != nil {
+                       // Timer already started; function was called within 
`wait` duration, debounce this event by resetting timer
+                       timer.Stop()
+               }
+               // Start timer, will only run target func if not called again 
after `wait` duration
+               timer = time.AfterFunc(wait, func() {
+                       log.Debugf("Debounce: timer completed, running target 
fn\n")
+                       // Reset all timers and run target func when wait time 
is reached
+                       fn()
+                       maxTimer.Stop()
+                       maxTimer = nil
+                       timer = nil
+               })
+               log.Debugf("Debounce: timer started, will wait %s\n", wait)
+       }
+}
+
+// Run until a signal is received, this function won't block
+func (c *Controller) Run(stop <-chan struct{}) {
+       go c.watchServiceCenterUpdate(stop)
+}
+
+func (c *Controller) Stop() {
+
+}
+
+// Return a debounced version of the push2istio method that merges the passed 
events on each call.
+func (c *Controller) getIstioPushDebouncer(wait time.Duration, maxWait 
time.Duration, maxEvents int) func([]event.ChangeEvent) {
+       var eventQueue []event.ChangeEvent // Queue of events merged from 
arguments of each call to debounced function
+       // Make a debounced version of push2istio, with provided wait and 
maxWait times
+       debouncedFn := debounce(func() {
+               log.Debugf("Debounce: push callback fired, pushing events to 
Istio: %v\n", eventQueue)
+               // Timeout reached, push events to istio and reset queue
+               c.push2Istio(eventQueue)
+               eventQueue = nil
+       }, wait, maxWait)
+       return func(newEvents []event.ChangeEvent) {
+               log.Debugf("Debounce: received and merged %d new events\n", 
len(newEvents))
+               // Merge new events with existing event queue for each received 
call
+               eventQueue = append(eventQueue, newEvents...)
+
+               log.Debugf("Debounce: new total number of events in queue is 
%d\n", len(eventQueue))

Review Comment:
   starts with lower case please



##########
istio/pkg/controllers/istioconnector/controller.go:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package istioconnector
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/istio/pkg/event"
+       "github.com/apache/servicecomb-service-center/istio/pkg/utils"
+       "github.com/go-chassis/cari/discovery"
+       "istio.io/client-go/pkg/apis/networking/v1alpha3"
+       "istio.io/client-go/pkg/clientset/versioned"
+       "istio.io/pkg/log"
+       v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/client-go/rest"
+       "k8s.io/client-go/tools/clientcmd"
+       k8s "sigs.k8s.io/controller-runtime/pkg/client/config"
+)
+
+// Controller receives service center updates and pushes converted Istio 
ServiceEntry(s) to k8s api server
+type Controller struct {
+       // Istio istioClient for k8s API
+       istioClient *versioned.Clientset
+       // Channel used to send and receive service center change events from 
the service center controller
+       event chan []event.ChangeEvent
+       // Lock for service cache
+       cacheMutex sync.Mutex
+       // Cache of converted service entries, mapped to original service 
center service id
+       convertedServiceCache map[string]*v1alpha3.ServiceEntry
+}
+
+func NewController(kubeconfigPath string, e chan []event.ChangeEvent) 
(*Controller, error) {
+       controller := &Controller{
+               event:                 e,
+               convertedServiceCache: make(map[string]*v1alpha3.ServiceEntry),
+       }
+
+       // get kubernetes config info, used for creating k8s client
+       client, err := newKubeClient(kubeconfigPath)
+       if err != nil {
+               log.Errorf("Failed to create istio client: %v\n", err)
+               return nil, err
+       }
+       controller.istioClient = client
+       return controller, nil
+}
+
+// Return a debounced version of a function `fn` that will not run until 
`wait` seconds have passed
+// after it was last called or until `maxWait` seconds have passed since its 
first call.
+// Once `fn` is executed, the max wait timer is reset.
+func debounce(fn func(), wait time.Duration, maxWait time.Duration) func() {
+       // Main timer, time seconds elapsed since last execution
+       var timer *time.Timer
+       // Max wait timer, time seconds elapsed since first call
+       var maxTimer *time.Timer
+       return func() {
+               if maxTimer == nil {
+                       // First debounced event, start max wait timer
+                       // will only run target func if not called again after 
`maxWait` duration
+                       maxTimer = time.AfterFunc(maxWait, func() {
+                               // Reset all timers when max wait time is 
reached
+                               log.Debugf("Debounce: maximum wait time 
reached, running target fn\n")
+                               if timer.Stop() {
+                                       // Only run target func if main timer 
hasn't already
+                                       fn()
+                               }
+                               timer = nil
+                               maxTimer = nil
+                       })
+                       log.Debugf("Debounce: max timer started, will wait max 
time of %s\n", maxWait)
+               }
+               if timer != nil {
+                       // Timer already started; function was called within 
`wait` duration, debounce this event by resetting timer
+                       timer.Stop()
+               }
+               // Start timer, will only run target func if not called again 
after `wait` duration
+               timer = time.AfterFunc(wait, func() {
+                       log.Debugf("Debounce: timer completed, running target 
fn\n")
+                       // Reset all timers and run target func when wait time 
is reached
+                       fn()
+                       maxTimer.Stop()
+                       maxTimer = nil
+                       timer = nil
+               })
+               log.Debugf("Debounce: timer started, will wait %s\n", wait)
+       }
+}
+
+// Run until a signal is received, this function won't block
+func (c *Controller) Run(stop <-chan struct{}) {
+       go c.watchServiceCenterUpdate(stop)
+}
+
+func (c *Controller) Stop() {
+

Review Comment:
   why not implement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to